diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-15 16:37:08 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-17 16:30:22 -0600 |
| commit | 45df4d0d9b577fecee798d672695fe24ff57fb1b (patch) | |
| tree | 1b99bf645035b58e0d6db08c7a83521f41f7a75b /vendor/hyper/src/proto/h2 | |
| parent | f94f79608393d4ab127db63cc41668445ef6b243 (diff) | |
feat: migrate from Cedar to SpiceDB authorization system
This is a major architectural change that replaces the Cedar policy-based
authorization system with SpiceDB's relation-based authorization.
Key changes:
- Migrate from Rust to Go implementation
- Replace Cedar policies with SpiceDB schema and relationships
- Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks
- Update build system and dependencies for Go ecosystem
- Maintain Envoy integration for external authorization
This change enables more flexible permission modeling through SpiceDB's
Google Zanzibar inspired relation-based system, supporting complex
hierarchical permissions that were difficult to express in Cedar.
Breaking change: Existing Cedar policies and Rust-based configuration
will no longer work and need to be migrated to SpiceDB schema.
Diffstat (limited to 'vendor/hyper/src/proto/h2')
| -rw-r--r-- | vendor/hyper/src/proto/h2/client.rs | 749 | ||||
| -rw-r--r-- | vendor/hyper/src/proto/h2/mod.rs | 446 | ||||
| -rw-r--r-- | vendor/hyper/src/proto/h2/ping.rs | 509 | ||||
| -rw-r--r-- | vendor/hyper/src/proto/h2/server.rs | 545 |
4 files changed, 0 insertions, 2249 deletions
diff --git a/vendor/hyper/src/proto/h2/client.rs b/vendor/hyper/src/proto/h2/client.rs deleted file mode 100644 index 5e9641e4..00000000 --- a/vendor/hyper/src/proto/h2/client.rs +++ /dev/null @@ -1,749 +0,0 @@ -use std::{ - convert::Infallible, - future::Future, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use crate::rt::{Read, Write}; -use bytes::Bytes; -use futures_channel::mpsc::{Receiver, Sender}; -use futures_channel::{mpsc, oneshot}; -use futures_util::future::{Either, FusedFuture, FutureExt as _}; -use futures_util::ready; -use futures_util::stream::{StreamExt as _, StreamFuture}; -use h2::client::{Builder, Connection, SendRequest}; -use h2::SendStream; -use http::{Method, StatusCode}; -use pin_project_lite::pin_project; - -use super::ping::{Ponger, Recorder}; -use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; -use crate::body::{Body, Incoming as IncomingBody}; -use crate::client::dispatch::{Callback, SendWhen, TrySendError}; -use crate::common::io::Compat; -use crate::common::time::Time; -use crate::ext::Protocol; -use crate::headers; -use crate::proto::h2::UpgradedSendStream; -use crate::proto::Dispatched; -use crate::rt::bounds::Http2ClientConnExec; -use crate::upgrade::Upgraded; -use crate::{Request, Response}; -use h2::client::ResponseFuture; - -type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>; - -///// An mpsc channel is used to help notify the `Connection` task when *all* -///// other handles to it have been dropped, so that it can shutdown. -type ConnDropRef = mpsc::Sender<Infallible>; - -///// A oneshot channel watches the `Connection` task, and when it completes, -///// the "dispatch" task will be notified and can shutdown sooner. -type ConnEof = oneshot::Receiver<Infallible>; - -// Our defaults are chosen for the "majority" case, which usually are not -// resource constrained, and so the spec default of 64kb can be too limiting -// for performance. -const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb -const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb -const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb -const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb -const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb - -// The maximum number of concurrent streams that the client is allowed to open -// before it receives the initial SETTINGS frame from the server. -// This default value is derived from what the HTTP/2 spec recommends as the -// minimum value that endpoints advertise to their peers. It means that using -// this value will minimize the chance of the failure where the local endpoint -// attempts to open too many streams and gets rejected by the remote peer with -// the `REFUSED_STREAM` error. -const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100; - -#[derive(Clone, Debug)] -pub(crate) struct Config { - pub(crate) adaptive_window: bool, - pub(crate) initial_conn_window_size: u32, - pub(crate) initial_stream_window_size: u32, - pub(crate) initial_max_send_streams: usize, - pub(crate) max_frame_size: Option<u32>, - pub(crate) max_header_list_size: u32, - pub(crate) keep_alive_interval: Option<Duration>, - pub(crate) keep_alive_timeout: Duration, - pub(crate) keep_alive_while_idle: bool, - pub(crate) max_concurrent_reset_streams: Option<usize>, - pub(crate) max_send_buffer_size: usize, - pub(crate) max_pending_accept_reset_streams: Option<usize>, - pub(crate) header_table_size: Option<u32>, - pub(crate) max_concurrent_streams: Option<u32>, -} - -impl Default for Config { - fn default() -> Config { - Config { - adaptive_window: false, - initial_conn_window_size: DEFAULT_CONN_WINDOW, - initial_stream_window_size: DEFAULT_STREAM_WINDOW, - initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS, - max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE), - max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE, - keep_alive_interval: None, - keep_alive_timeout: Duration::from_secs(20), - keep_alive_while_idle: false, - max_concurrent_reset_streams: None, - max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, - max_pending_accept_reset_streams: None, - header_table_size: None, - max_concurrent_streams: None, - } - } -} - -fn new_builder(config: &Config) -> Builder { - let mut builder = Builder::default(); - builder - .initial_max_send_streams(config.initial_max_send_streams) - .initial_window_size(config.initial_stream_window_size) - .initial_connection_window_size(config.initial_conn_window_size) - .max_header_list_size(config.max_header_list_size) - .max_send_buffer_size(config.max_send_buffer_size) - .enable_push(false); - if let Some(max) = config.max_frame_size { - builder.max_frame_size(max); - } - if let Some(max) = config.max_concurrent_reset_streams { - builder.max_concurrent_reset_streams(max); - } - if let Some(max) = config.max_pending_accept_reset_streams { - builder.max_pending_accept_reset_streams(max); - } - if let Some(size) = config.header_table_size { - builder.header_table_size(size); - } - if let Some(max) = config.max_concurrent_streams { - builder.max_concurrent_streams(max); - } - builder -} - -fn new_ping_config(config: &Config) -> ping::Config { - ping::Config { - bdp_initial_window: if config.adaptive_window { - Some(config.initial_stream_window_size) - } else { - None - }, - keep_alive_interval: config.keep_alive_interval, - keep_alive_timeout: config.keep_alive_timeout, - keep_alive_while_idle: config.keep_alive_while_idle, - } -} - -pub(crate) async fn handshake<T, B, E>( - io: T, - req_rx: ClientRx<B>, - config: &Config, - mut exec: E, - timer: Time, -) -> crate::Result<ClientTask<B, E, T>> -where - T: Read + Write + Unpin, - B: Body + 'static, - B::Data: Send + 'static, - E: Http2ClientConnExec<B, T> + Unpin, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, -{ - let (h2_tx, mut conn) = new_builder(config) - .handshake::<_, SendBuf<B::Data>>(Compat::new(io)) - .await - .map_err(crate::Error::new_h2)?; - - // An mpsc channel is used entirely to detect when the - // 'Client' has been dropped. This is to get around a bug - // in h2 where dropping all SendRequests won't notify a - // parked Connection. - let (conn_drop_ref, rx) = mpsc::channel(1); - let (cancel_tx, conn_eof) = oneshot::channel(); - - let conn_drop_rx = rx.into_future(); - - let ping_config = new_ping_config(config); - - let (conn, ping) = if ping_config.is_enabled() { - let pp = conn.ping_pong().expect("conn.ping_pong"); - let (recorder, ponger) = ping::channel(pp, ping_config, timer); - - let conn: Conn<_, B> = Conn::new(ponger, conn); - (Either::Left(conn), recorder) - } else { - (Either::Right(conn), ping::disabled()) - }; - let conn: ConnMapErr<T, B> = ConnMapErr { - conn, - is_terminated: false, - }; - - exec.execute_h2_future(H2ClientFuture::Task { - task: ConnTask::new(conn, conn_drop_rx, cancel_tx), - }); - - Ok(ClientTask { - ping, - conn_drop_ref, - conn_eof, - executor: exec, - h2_tx, - req_rx, - fut_ctx: None, - marker: PhantomData, - }) -} - -pin_project! { - struct Conn<T, B> - where - B: Body, - { - #[pin] - ponger: Ponger, - #[pin] - conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>, - } -} - -impl<T, B> Conn<T, B> -where - B: Body, - T: Read + Write + Unpin, -{ - fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self { - Conn { ponger, conn } - } -} - -impl<T, B> Future for Conn<T, B> -where - B: Body, - T: Read + Write + Unpin, -{ - type Output = Result<(), h2::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - match this.ponger.poll(cx) { - Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { - this.conn.set_target_window_size(wnd); - this.conn.set_initial_window_size(wnd)?; - } - Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { - debug!("connection keep-alive timed out"); - return Poll::Ready(Ok(())); - } - Poll::Pending => {} - } - - Pin::new(&mut this.conn).poll(cx) - } -} - -pin_project! { - struct ConnMapErr<T, B> - where - B: Body, - T: Read, - T: Write, - T: Unpin, - { - #[pin] - conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>, - #[pin] - is_terminated: bool, - } -} - -impl<T, B> Future for ConnMapErr<T, B> -where - B: Body, - T: Read + Write + Unpin, -{ - type Output = Result<(), ()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - - if *this.is_terminated { - return Poll::Pending; - } - let polled = this.conn.poll(cx); - if polled.is_ready() { - *this.is_terminated = true; - } - polled.map_err(|_e| { - debug!(error = %_e, "connection error"); - }) - } -} - -impl<T, B> FusedFuture for ConnMapErr<T, B> -where - B: Body, - T: Read + Write + Unpin, -{ - fn is_terminated(&self) -> bool { - self.is_terminated - } -} - -pin_project! { - pub struct ConnTask<T, B> - where - B: Body, - T: Read, - T: Write, - T: Unpin, - { - #[pin] - drop_rx: StreamFuture<Receiver<Infallible>>, - #[pin] - cancel_tx: Option<oneshot::Sender<Infallible>>, - #[pin] - conn: ConnMapErr<T, B>, - } -} - -impl<T, B> ConnTask<T, B> -where - B: Body, - T: Read + Write + Unpin, -{ - fn new( - conn: ConnMapErr<T, B>, - drop_rx: StreamFuture<Receiver<Infallible>>, - cancel_tx: oneshot::Sender<Infallible>, - ) -> Self { - Self { - drop_rx, - cancel_tx: Some(cancel_tx), - conn, - } - } -} - -impl<T, B> Future for ConnTask<T, B> -where - B: Body, - T: Read + Write + Unpin, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - - if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() { - // ok or err, the `conn` has finished. - return Poll::Ready(()); - } - - if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close. - trace!("send_request dropped, starting conn shutdown"); - drop(this.cancel_tx.take().expect("ConnTask Future polled twice")); - } - - Poll::Pending - } -} - -pin_project! { - #[project = H2ClientFutureProject] - pub enum H2ClientFuture<B, T> - where - B: http_body::Body, - B: 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - T: Read, - T: Write, - T: Unpin, - { - Pipe { - #[pin] - pipe: PipeMap<B>, - }, - Send { - #[pin] - send_when: SendWhen<B>, - }, - Task { - #[pin] - task: ConnTask<T, B>, - }, - } -} - -impl<B, T> Future for H2ClientFuture<B, T> -where - B: http_body::Body + 'static, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - T: Read + Write + Unpin, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { - let this = self.project(); - - match this { - H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx), - H2ClientFutureProject::Send { send_when } => send_when.poll(cx), - H2ClientFutureProject::Task { task } => task.poll(cx), - } - } -} - -struct FutCtx<B> -where - B: Body, -{ - is_connect: bool, - eos: bool, - fut: ResponseFuture, - body_tx: SendStream<SendBuf<B::Data>>, - body: B, - cb: Callback<Request<B>, Response<IncomingBody>>, -} - -impl<B: Body> Unpin for FutCtx<B> {} - -pub(crate) struct ClientTask<B, E, T> -where - B: Body, - E: Unpin, -{ - ping: ping::Recorder, - conn_drop_ref: ConnDropRef, - conn_eof: ConnEof, - executor: E, - h2_tx: SendRequest<SendBuf<B::Data>>, - req_rx: ClientRx<B>, - fut_ctx: Option<FutCtx<B>>, - marker: PhantomData<T>, -} - -impl<B, E, T> ClientTask<B, E, T> -where - B: Body + 'static, - E: Http2ClientConnExec<B, T> + Unpin, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - T: Read + Write + Unpin, -{ - pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { - self.h2_tx.is_extended_connect_protocol_enabled() - } -} - -pin_project! { - pub struct PipeMap<S> - where - S: Body, - { - #[pin] - pipe: PipeToSendStream<S>, - #[pin] - conn_drop_ref: Option<Sender<Infallible>>, - #[pin] - ping: Option<Recorder>, - } -} - -impl<B> Future for PipeMap<B> -where - B: http_body::Body, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { - let mut this = self.project(); - - match this.pipe.poll_unpin(cx) { - Poll::Ready(result) => { - if let Err(_e) = result { - debug!("client request body error: {}", _e); - } - drop(this.conn_drop_ref.take().expect("Future polled twice")); - drop(this.ping.take().expect("Future polled twice")); - return Poll::Ready(()); - } - Poll::Pending => (), - }; - Poll::Pending - } -} - -impl<B, E, T> ClientTask<B, E, T> -where - B: Body + 'static + Unpin, - B::Data: Send, - E: Http2ClientConnExec<B, T> + Unpin, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - T: Read + Write + Unpin, -{ - fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) { - let ping = self.ping.clone(); - - let send_stream = if !f.is_connect { - if !f.eos { - let mut pipe = PipeToSendStream::new(f.body, f.body_tx); - - // eagerly see if the body pipe is ready and - // can thus skip allocating in the executor - match Pin::new(&mut pipe).poll(cx) { - Poll::Ready(_) => (), - Poll::Pending => { - let conn_drop_ref = self.conn_drop_ref.clone(); - // keep the ping recorder's knowledge of an - // "open stream" alive while this body is - // still sending... - let ping = ping.clone(); - - let pipe = PipeMap { - pipe, - conn_drop_ref: Some(conn_drop_ref), - ping: Some(ping), - }; - // Clear send task - self.executor - .execute_h2_future(H2ClientFuture::Pipe { pipe }); - } - } - } - - None - } else { - Some(f.body_tx) - }; - - self.executor.execute_h2_future(H2ClientFuture::Send { - send_when: SendWhen { - when: ResponseFutMap { - fut: f.fut, - ping: Some(ping), - send_stream: Some(send_stream), - }, - call_back: Some(f.cb), - }, - }); - } -} - -pin_project! { - pub(crate) struct ResponseFutMap<B> - where - B: Body, - B: 'static, - { - #[pin] - fut: ResponseFuture, - #[pin] - ping: Option<Recorder>, - #[pin] - send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>, - } -} - -impl<B> Future for ResponseFutMap<B> -where - B: Body + 'static, -{ - type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - - let result = ready!(this.fut.poll(cx)); - - let ping = this.ping.take().expect("Future polled twice"); - let send_stream = this.send_stream.take().expect("Future polled twice"); - - match result { - Ok(res) => { - // record that we got the response headers - ping.record_non_data(); - - let content_length = headers::content_length_parse_all(res.headers()); - if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { - if content_length.map_or(false, |len| len != 0) { - warn!("h2 connect response with non-zero body not supported"); - - send_stream.send_reset(h2::Reason::INTERNAL_ERROR); - return Poll::Ready(Err(( - crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), - None::<Request<B>>, - ))); - } - let (parts, recv_stream) = res.into_parts(); - let mut res = Response::from_parts(parts, IncomingBody::empty()); - - let (pending, on_upgrade) = crate::upgrade::pending(); - let io = H2Upgraded { - ping, - send_stream: unsafe { UpgradedSendStream::new(send_stream) }, - recv_stream, - buf: Bytes::new(), - }; - let upgraded = Upgraded::new(io, Bytes::new()); - - pending.fulfill(upgraded); - res.extensions_mut().insert(on_upgrade); - - Poll::Ready(Ok(res)) - } else { - let res = res.map(|stream| { - let ping = ping.for_stream(&stream); - IncomingBody::h2(stream, content_length.into(), ping) - }); - Poll::Ready(Ok(res)) - } - } - Err(err) => { - ping.ensure_not_timed_out().map_err(|e| (e, None))?; - - debug!("client response error: {}", err); - Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>))) - } - } - } -} - -impl<B, E, T> Future for ClientTask<B, E, T> -where - B: Body + 'static + Unpin, - B::Data: Send, - B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, - E: Http2ClientConnExec<B, T> + Unpin, - T: Read + Write + Unpin, -{ - type Output = crate::Result<Dispatched>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - loop { - match ready!(self.h2_tx.poll_ready(cx)) { - Ok(()) => (), - Err(err) => { - self.ping.ensure_not_timed_out()?; - return if err.reason() == Some(::h2::Reason::NO_ERROR) { - trace!("connection gracefully shutdown"); - Poll::Ready(Ok(Dispatched::Shutdown)) - } else { - Poll::Ready(Err(crate::Error::new_h2(err))) - }; - } - }; - - // If we were waiting on pending open - // continue where we left off. - if let Some(f) = self.fut_ctx.take() { - self.poll_pipe(f, cx); - continue; - } - - match self.req_rx.poll_recv(cx) { - Poll::Ready(Some((req, cb))) => { - // check that future hasn't been canceled already - if cb.is_canceled() { - trace!("request callback is canceled"); - continue; - } - let (head, body) = req.into_parts(); - let mut req = ::http::Request::from_parts(head, ()); - super::strip_connection_headers(req.headers_mut(), true); - if let Some(len) = body.size_hint().exact() { - if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { - headers::set_content_length_if_missing(req.headers_mut(), len); - } - } - - let is_connect = req.method() == Method::CONNECT; - let eos = body.is_end_stream(); - - if is_connect - && headers::content_length_parse_all(req.headers()) - .map_or(false, |len| len != 0) - { - warn!("h2 connect request with non-zero body not supported"); - cb.send(Err(TrySendError { - error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), - message: None, - })); - continue; - } - - if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { - req.extensions_mut().insert(protocol.into_inner()); - } - - let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { - Ok(ok) => ok, - Err(err) => { - debug!("client send request error: {}", err); - cb.send(Err(TrySendError { - error: crate::Error::new_h2(err), - message: None, - })); - continue; - } - }; - - let f = FutCtx { - is_connect, - eos, - fut, - body_tx, - body, - cb, - }; - - // Check poll_ready() again. - // If the call to send_request() resulted in the new stream being pending open - // we have to wait for the open to complete before accepting new requests. - match self.h2_tx.poll_ready(cx) { - Poll::Pending => { - // Save Context - self.fut_ctx = Some(f); - return Poll::Pending; - } - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(err)) => { - f.cb.send(Err(TrySendError { - error: crate::Error::new_h2(err), - message: None, - })); - continue; - } - } - self.poll_pipe(f, cx); - continue; - } - - Poll::Ready(None) => { - trace!("client::dispatch::Sender dropped"); - return Poll::Ready(Ok(Dispatched::Shutdown)); - } - - Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { - // As of Rust 1.82, this pattern is no longer needed, and emits a warning. - // But we cannot remove it as long as MSRV is less than that. - #[allow(unused)] - Ok(never) => match never {}, - Err(_conn_is_eof) => { - trace!("connection task is closed, closing dispatch task"); - return Poll::Ready(Ok(Dispatched::Shutdown)); - } - }, - } - } - } -} diff --git a/vendor/hyper/src/proto/h2/mod.rs b/vendor/hyper/src/proto/h2/mod.rs deleted file mode 100644 index adb6de87..00000000 --- a/vendor/hyper/src/proto/h2/mod.rs +++ /dev/null @@ -1,446 +0,0 @@ -use std::error::Error as StdError; -use std::future::Future; -use std::io::{Cursor, IoSlice}; -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use bytes::{Buf, Bytes}; -use futures_util::ready; -use h2::{Reason, RecvStream, SendStream}; -use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE}; -use http::HeaderMap; -use pin_project_lite::pin_project; - -use crate::body::Body; -use crate::proto::h2::ping::Recorder; -use crate::rt::{Read, ReadBufCursor, Write}; - -pub(crate) mod ping; - -cfg_client! { - pub(crate) mod client; - pub(crate) use self::client::ClientTask; -} - -cfg_server! { - pub(crate) mod server; - pub(crate) use self::server::Server; -} - -/// Default initial stream window size defined in HTTP2 spec. -pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; - -// List of connection headers from RFC 9110 Section 7.6.1 -// -// TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're -// tested separately. -static CONNECTION_HEADERS: [HeaderName; 4] = [ - HeaderName::from_static("keep-alive"), - HeaderName::from_static("proxy-connection"), - TRANSFER_ENCODING, - UPGRADE, -]; - -fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { - for header in &CONNECTION_HEADERS { - if headers.remove(header).is_some() { - warn!("Connection header illegal in HTTP/2: {}", header.as_str()); - } - } - - if is_request { - if headers - .get(TE) - .map_or(false, |te_header| te_header != "trailers") - { - warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests"); - headers.remove(TE); - } - } else if headers.remove(TE).is_some() { - warn!("TE headers illegal in HTTP/2 responses"); - } - - if let Some(header) = headers.remove(CONNECTION) { - warn!( - "Connection header illegal in HTTP/2: {}", - CONNECTION.as_str() - ); - let header_contents = header.to_str().unwrap(); - - // A `Connection` header may have a comma-separated list of names of other headers that - // are meant for only this specific connection. - // - // Iterate these names and remove them as headers. Connection-specific headers are - // forbidden in HTTP2, as that information has been moved into frame types of the h2 - // protocol. - for name in header_contents.split(',') { - let name = name.trim(); - headers.remove(name); - } - } -} - -// body adapters used by both Client and Server - -pin_project! { - pub(crate) struct PipeToSendStream<S> - where - S: Body, - { - body_tx: SendStream<SendBuf<S::Data>>, - data_done: bool, - #[pin] - stream: S, - } -} - -impl<S> PipeToSendStream<S> -where - S: Body, -{ - fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { - PipeToSendStream { - body_tx: tx, - data_done: false, - stream, - } - } -} - -impl<S> Future for PipeToSendStream<S> -where - S: Body, - S::Error: Into<Box<dyn StdError + Send + Sync>>, -{ - type Output = crate::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut me = self.project(); - loop { - // we don't have the next chunk of data yet, so just reserve 1 byte to make - // sure there's some capacity available. h2 will handle the capacity management - // for the actual body chunk. - me.body_tx.reserve_capacity(1); - - if me.body_tx.capacity() == 0 { - loop { - match ready!(me.body_tx.poll_capacity(cx)) { - Some(Ok(0)) => {} - Some(Ok(_)) => break, - Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))), - None => { - // None means the stream is no longer in a - // streaming state, we either finished it - // somehow, or the remote reset us. - return Poll::Ready(Err(crate::Error::new_body_write( - "send stream capacity unexpectedly closed", - ))); - } - } - } - } else if let Poll::Ready(reason) = me - .body_tx - .poll_reset(cx) - .map_err(crate::Error::new_body_write)? - { - debug!("stream received RST_STREAM: {:?}", reason); - return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); - } - - match ready!(me.stream.as_mut().poll_frame(cx)) { - Some(Ok(frame)) => { - if frame.is_data() { - let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); - let is_eos = me.stream.is_end_stream(); - trace!( - "send body chunk: {} bytes, eos={}", - chunk.remaining(), - is_eos, - ); - - let buf = SendBuf::Buf(chunk); - me.body_tx - .send_data(buf, is_eos) - .map_err(crate::Error::new_body_write)?; - - if is_eos { - return Poll::Ready(Ok(())); - } - } else if frame.is_trailers() { - // no more DATA, so give any capacity back - me.body_tx.reserve_capacity(0); - me.body_tx - .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!())) - .map_err(crate::Error::new_body_write)?; - return Poll::Ready(Ok(())); - } else { - trace!("discarding unknown frame"); - // loop again - } - } - Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), - None => { - // no more frames means we're done here - // but at this point, we haven't sent an EOS DATA, or - // any trailers, so send an empty EOS DATA. - return Poll::Ready(me.body_tx.send_eos_frame()); - } - } - } - } -} - -trait SendStreamExt { - fn on_user_err<E>(&mut self, err: E) -> crate::Error - where - E: Into<Box<dyn std::error::Error + Send + Sync>>; - fn send_eos_frame(&mut self) -> crate::Result<()>; -} - -impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { - fn on_user_err<E>(&mut self, err: E) -> crate::Error - where - E: Into<Box<dyn std::error::Error + Send + Sync>>, - { - let err = crate::Error::new_user_body(err); - debug!("send body user stream error: {}", err); - self.send_reset(err.h2_reason()); - err - } - - fn send_eos_frame(&mut self) -> crate::Result<()> { - trace!("send body eos"); - self.send_data(SendBuf::None, true) - .map_err(crate::Error::new_body_write) - } -} - -#[repr(usize)] -enum SendBuf<B> { - Buf(B), - Cursor(Cursor<Box<[u8]>>), - None, -} - -impl<B: Buf> Buf for SendBuf<B> { - #[inline] - fn remaining(&self) -> usize { - match *self { - Self::Buf(ref b) => b.remaining(), - Self::Cursor(ref c) => Buf::remaining(c), - Self::None => 0, - } - } - - #[inline] - fn chunk(&self) -> &[u8] { - match *self { - Self::Buf(ref b) => b.chunk(), - Self::Cursor(ref c) => c.chunk(), - Self::None => &[], - } - } - - #[inline] - fn advance(&mut self, cnt: usize) { - match *self { - Self::Buf(ref mut b) => b.advance(cnt), - Self::Cursor(ref mut c) => c.advance(cnt), - Self::None => {} - } - } - - fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { - match *self { - Self::Buf(ref b) => b.chunks_vectored(dst), - Self::Cursor(ref c) => c.chunks_vectored(dst), - Self::None => 0, - } - } -} - -struct H2Upgraded<B> -where - B: Buf, -{ - ping: Recorder, - send_stream: UpgradedSendStream<B>, - recv_stream: RecvStream, - buf: Bytes, -} - -impl<B> Read for H2Upgraded<B> -where - B: Buf, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut read_buf: ReadBufCursor<'_>, - ) -> Poll<Result<(), std::io::Error>> { - if self.buf.is_empty() { - self.buf = loop { - match ready!(self.recv_stream.poll_data(cx)) { - None => return Poll::Ready(Ok(())), - Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { - continue - } - Some(Ok(buf)) => { - self.ping.record_data(buf.len()); - break buf; - } - Some(Err(e)) => { - return Poll::Ready(match e.reason() { - Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), - Some(Reason::STREAM_CLOSED) => { - Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) - } - _ => Err(h2_to_io_error(e)), - }) - } - } - }; - } - let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); - read_buf.put_slice(&self.buf[..cnt]); - self.buf.advance(cnt); - let _ = self.recv_stream.flow_control().release_capacity(cnt); - Poll::Ready(Ok(())) - } -} - -impl<B> Write for H2Upgraded<B> -where - B: Buf, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize, std::io::Error>> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - self.send_stream.reserve_capacity(buf.len()); - - // We ignore all errors returned by `poll_capacity` and `write`, as we - // will get the correct from `poll_reset` anyway. - let cnt = match ready!(self.send_stream.poll_capacity(cx)) { - None => Some(0), - Some(Ok(cnt)) => self - .send_stream - .write(&buf[..cnt], false) - .ok() - .map(|()| cnt), - Some(Err(_)) => None, - }; - - if let Some(cnt) = cnt { - return Poll::Ready(Ok(cnt)); - } - - Poll::Ready(Err(h2_to_io_error( - match ready!(self.send_stream.poll_reset(cx)) { - Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { - return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) - } - Ok(reason) => reason.into(), - Err(e) => e, - }, - ))) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll<Result<(), std::io::Error>> { - if self.send_stream.write(&[], true).is_ok() { - return Poll::Ready(Ok(())); - } - - Poll::Ready(Err(h2_to_io_error( - match ready!(self.send_stream.poll_reset(cx)) { - Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())), - Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { - return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) - } - Ok(reason) => reason.into(), - Err(e) => e, - }, - ))) - } -} - -fn h2_to_io_error(e: h2::Error) -> std::io::Error { - if e.is_io() { - e.into_io().unwrap() - } else { - std::io::Error::new(std::io::ErrorKind::Other, e) - } -} - -struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); - -impl<B> UpgradedSendStream<B> -where - B: Buf, -{ - unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { - assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); - Self(mem::transmute(inner)) - } - - fn reserve_capacity(&mut self, cnt: usize) { - unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } - } - - fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { - unsafe { self.as_inner_unchecked().poll_capacity(cx) } - } - - fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { - unsafe { self.as_inner_unchecked().poll_reset(cx) } - } - - fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> { - let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); - unsafe { - self.as_inner_unchecked() - .send_data(send_buf, end_of_stream) - .map_err(h2_to_io_error) - } - } - - unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { - &mut *(&mut self.0 as *mut _ as *mut _) - } -} - -#[repr(transparent)] -struct Neutered<B> { - _inner: B, - impossible: Impossible, -} - -enum Impossible {} - -unsafe impl<B> Send for Neutered<B> {} - -impl<B> Buf for Neutered<B> { - fn remaining(&self) -> usize { - match self.impossible {} - } - - fn chunk(&self) -> &[u8] { - match self.impossible {} - } - - fn advance(&mut self, _cnt: usize) { - match self.impossible {} - } -} diff --git a/vendor/hyper/src/proto/h2/ping.rs b/vendor/hyper/src/proto/h2/ping.rs deleted file mode 100644 index 749cf1b7..00000000 --- a/vendor/hyper/src/proto/h2/ping.rs +++ /dev/null @@ -1,509 +0,0 @@ -/// HTTP2 Ping usage -/// -/// hyper uses HTTP2 pings for two purposes: -/// -/// 1. Adaptive flow control using BDP -/// 2. Connection keep-alive -/// -/// Both cases are optional. -/// -/// # BDP Algorithm -/// -/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: -/// 1a. Record current time. -/// 1b. Send a BDP ping. -/// 2. Increment the number of received bytes. -/// 3. When the BDP ping ack is received: -/// 3a. Record duration from sent time. -/// 3b. Merge RTT with a running average. -/// 3c. Calculate bdp as bytes/rtt. -/// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{self, Poll}; -use std::time::{Duration, Instant}; - -use h2::{Ping, PingPong}; - -use crate::common::time::Time; -use crate::rt::Sleep; - -type WindowSize = u32; - -pub(super) fn disabled() -> Recorder { - Recorder { shared: None } -} - -pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) { - debug_assert!( - config.is_enabled(), - "ping channel requires bdp or keep-alive config", - ); - - let bdp = config.bdp_initial_window.map(|wnd| Bdp { - bdp: wnd, - max_bandwidth: 0.0, - rtt: 0.0, - ping_delay: Duration::from_millis(100), - stable_count: 0, - }); - - let (bytes, next_bdp_at) = if bdp.is_some() { - (Some(0), Some(Instant::now())) - } else { - (None, None) - }; - - let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { - interval, - timeout: config.keep_alive_timeout, - while_idle: config.keep_alive_while_idle, - sleep: __timer.sleep(interval), - state: KeepAliveState::Init, - timer: __timer, - }); - - let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); - - let shared = Arc::new(Mutex::new(Shared { - bytes, - last_read_at, - is_keep_alive_timed_out: false, - ping_pong, - ping_sent_at: None, - next_bdp_at, - })); - - ( - Recorder { - shared: Some(shared.clone()), - }, - Ponger { - bdp, - keep_alive, - shared, - }, - ) -} - -#[derive(Clone)] -pub(super) struct Config { - pub(super) bdp_initial_window: Option<WindowSize>, - /// If no frames are received in this amount of time, a PING frame is sent. - pub(super) keep_alive_interval: Option<Duration>, - /// After sending a keepalive PING, the connection will be closed if - /// a pong is not received in this amount of time. - pub(super) keep_alive_timeout: Duration, - /// If true, sends pings even when there are no active streams. - pub(super) keep_alive_while_idle: bool, -} - -#[derive(Clone)] -pub(crate) struct Recorder { - shared: Option<Arc<Mutex<Shared>>>, -} - -pub(super) struct Ponger { - bdp: Option<Bdp>, - keep_alive: Option<KeepAlive>, - shared: Arc<Mutex<Shared>>, -} - -struct Shared { - ping_pong: PingPong, - ping_sent_at: Option<Instant>, - - // bdp - /// If `Some`, bdp is enabled, and this tracks how many bytes have been - /// read during the current sample. - bytes: Option<usize>, - /// We delay a variable amount of time between BDP pings. This allows us - /// to send less pings as the bandwidth stabilizes. - next_bdp_at: Option<Instant>, - - // keep-alive - /// If `Some`, keep-alive is enabled, and the Instant is how long ago - /// the connection read the last frame. - last_read_at: Option<Instant>, - - is_keep_alive_timed_out: bool, -} - -struct Bdp { - /// Current BDP in bytes - bdp: u32, - /// Largest bandwidth we've seen so far. - max_bandwidth: f64, - /// Round trip time in seconds - rtt: f64, - /// Delay the next ping by this amount. - /// - /// This will change depending on how stable the current bandwidth is. - ping_delay: Duration, - /// The count of ping round trips where BDP has stayed the same. - stable_count: u32, -} - -struct KeepAlive { - /// If no frames are received in this amount of time, a PING frame is sent. - interval: Duration, - /// After sending a keepalive PING, the connection will be closed if - /// a pong is not received in this amount of time. - timeout: Duration, - /// If true, sends pings even when there are no active streams. - while_idle: bool, - state: KeepAliveState, - sleep: Pin<Box<dyn Sleep>>, - timer: Time, -} - -enum KeepAliveState { - Init, - Scheduled(Instant), - PingSent, -} - -pub(super) enum Ponged { - SizeUpdate(WindowSize), - KeepAliveTimedOut, -} - -#[derive(Debug)] -pub(super) struct KeepAliveTimedOut; - -// ===== impl Config ===== - -impl Config { - pub(super) fn is_enabled(&self) -> bool { - self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some() - } -} - -// ===== impl Recorder ===== - -impl Recorder { - pub(crate) fn record_data(&self, len: usize) { - let shared = if let Some(ref shared) = self.shared { - shared - } else { - return; - }; - - let mut locked = shared.lock().unwrap(); - - locked.update_last_read_at(); - - // are we ready to send another bdp ping? - // if not, we don't need to record bytes either - - if let Some(ref next_bdp_at) = locked.next_bdp_at { - if Instant::now() < *next_bdp_at { - return; - } else { - locked.next_bdp_at = None; - } - } - - if let Some(ref mut bytes) = locked.bytes { - *bytes += len; - } else { - // no need to send bdp ping if bdp is disabled - return; - } - - if !locked.is_ping_sent() { - locked.send_ping(); - } - } - - pub(crate) fn record_non_data(&self) { - let shared = if let Some(ref shared) = self.shared { - shared - } else { - return; - }; - - let mut locked = shared.lock().unwrap(); - - locked.update_last_read_at(); - } - - /// If the incoming stream is already closed, convert self into - /// a disabled reporter. - #[cfg(feature = "client")] - pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { - if stream.is_end_stream() { - disabled() - } else { - self - } - } - - pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> { - if let Some(ref shared) = self.shared { - let locked = shared.lock().unwrap(); - if locked.is_keep_alive_timed_out { - return Err(KeepAliveTimedOut.crate_error()); - } - } - - // else - Ok(()) - } -} - -// ===== impl Ponger ===== - -impl Ponger { - pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> { - let now = Instant::now(); - let mut locked = self.shared.lock().unwrap(); - let is_idle = self.is_idle(); - - if let Some(ref mut ka) = self.keep_alive { - ka.maybe_schedule(is_idle, &locked); - ka.maybe_ping(cx, is_idle, &mut locked); - } - - if !locked.is_ping_sent() { - // XXX: this doesn't register a waker...? - return Poll::Pending; - } - - match locked.ping_pong.poll_pong(cx) { - Poll::Ready(Ok(_pong)) => { - let start = locked - .ping_sent_at - .expect("pong received implies ping_sent_at"); - locked.ping_sent_at = None; - let rtt = now - start; - trace!("recv pong"); - - if let Some(ref mut ka) = self.keep_alive { - locked.update_last_read_at(); - ka.maybe_schedule(is_idle, &locked); - ka.maybe_ping(cx, is_idle, &mut locked); - } - - if let Some(ref mut bdp) = self.bdp { - let bytes = locked.bytes.expect("bdp enabled implies bytes"); - locked.bytes = Some(0); // reset - trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); - - let update = bdp.calculate(bytes, rtt); - locked.next_bdp_at = Some(now + bdp.ping_delay); - if let Some(update) = update { - return Poll::Ready(Ponged::SizeUpdate(update)); - } - } - } - Poll::Ready(Err(_e)) => { - debug!("pong error: {}", _e); - } - Poll::Pending => { - if let Some(ref mut ka) = self.keep_alive { - if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) { - self.keep_alive = None; - locked.is_keep_alive_timed_out = true; - return Poll::Ready(Ponged::KeepAliveTimedOut); - } - } - } - } - - // XXX: this doesn't register a waker...? - Poll::Pending - } - - fn is_idle(&self) -> bool { - Arc::strong_count(&self.shared) <= 2 - } -} - -// ===== impl Shared ===== - -impl Shared { - fn send_ping(&mut self) { - match self.ping_pong.send_ping(Ping::opaque()) { - Ok(()) => { - self.ping_sent_at = Some(Instant::now()); - trace!("sent ping"); - } - Err(_err) => { - debug!("error sending ping: {}", _err); - } - } - } - - fn is_ping_sent(&self) -> bool { - self.ping_sent_at.is_some() - } - - fn update_last_read_at(&mut self) { - if self.last_read_at.is_some() { - self.last_read_at = Some(Instant::now()); - } - } - - fn last_read_at(&self) -> Instant { - self.last_read_at.expect("keep_alive expects last_read_at") - } -} - -// ===== impl Bdp ===== - -/// Any higher than this likely will be hitting the TCP flow control. -const BDP_LIMIT: usize = 1024 * 1024 * 16; - -impl Bdp { - fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { - // No need to do any math if we're at the limit. - if self.bdp as usize == BDP_LIMIT { - self.stabilize_delay(); - return None; - } - - // average the rtt - let rtt = seconds(rtt); - if self.rtt == 0.0 { - // First sample means rtt is first rtt. - self.rtt = rtt; - } else { - // Weigh this rtt as 1/8 for a moving average. - self.rtt += (rtt - self.rtt) * 0.125; - } - - // calculate the current bandwidth - let bw = (bytes as f64) / (self.rtt * 1.5); - trace!("current bandwidth = {:.1}B/s", bw); - - if bw < self.max_bandwidth { - // not a faster bandwidth, so don't update - self.stabilize_delay(); - return None; - } else { - self.max_bandwidth = bw; - } - - // if the current `bytes` sample is at least 2/3 the previous - // bdp, increase to double the current sample. - if bytes >= self.bdp as usize * 2 / 3 { - self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; - trace!("BDP increased to {}", self.bdp); - - self.stable_count = 0; - self.ping_delay /= 2; - Some(self.bdp) - } else { - self.stabilize_delay(); - None - } - } - - fn stabilize_delay(&mut self) { - if self.ping_delay < Duration::from_secs(10) { - self.stable_count += 1; - - if self.stable_count >= 2 { - self.ping_delay *= 4; - self.stable_count = 0; - } - } - } -} - -fn seconds(dur: Duration) -> f64 { - const NANOS_PER_SEC: f64 = 1_000_000_000.0; - let secs = dur.as_secs() as f64; - secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC -} - -// ===== impl KeepAlive ===== - -impl KeepAlive { - fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) { - match self.state { - KeepAliveState::Init => { - if !self.while_idle && is_idle { - return; - } - - self.schedule(shared); - } - KeepAliveState::PingSent => { - if shared.is_ping_sent() { - return; - } - self.schedule(shared); - } - KeepAliveState::Scheduled(..) => (), - } - } - - fn schedule(&mut self, shared: &Shared) { - let interval = shared.last_read_at() + self.interval; - self.state = KeepAliveState::Scheduled(interval); - self.timer.reset(&mut self.sleep, interval); - } - - fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) { - match self.state { - KeepAliveState::Scheduled(at) => { - if Pin::new(&mut self.sleep).poll(cx).is_pending() { - return; - } - // check if we've received a frame while we were scheduled - if shared.last_read_at() + self.interval > at { - self.state = KeepAliveState::Init; - cx.waker().wake_by_ref(); // schedule us again - return; - } - if !self.while_idle && is_idle { - trace!("keep-alive no need to ping when idle and while_idle=false"); - return; - } - trace!("keep-alive interval ({:?}) reached", self.interval); - shared.send_ping(); - self.state = KeepAliveState::PingSent; - let timeout = Instant::now() + self.timeout; - self.timer.reset(&mut self.sleep, timeout); - } - KeepAliveState::Init | KeepAliveState::PingSent => (), - } - } - - fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> { - match self.state { - KeepAliveState::PingSent => { - if Pin::new(&mut self.sleep).poll(cx).is_pending() { - return Ok(()); - } - trace!("keep-alive timeout ({:?}) reached", self.timeout); - Err(KeepAliveTimedOut) - } - KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()), - } - } -} - -// ===== impl KeepAliveTimedOut ===== - -impl KeepAliveTimedOut { - pub(super) fn crate_error(self) -> crate::Error { - crate::Error::new(crate::error::Kind::Http2).with(self) - } -} - -impl fmt::Display for KeepAliveTimedOut { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("keep-alive timed out") - } -} - -impl std::error::Error for KeepAliveTimedOut { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&crate::error::TimedOut) - } -} diff --git a/vendor/hyper/src/proto/h2/server.rs b/vendor/hyper/src/proto/h2/server.rs deleted file mode 100644 index a8a20dd6..00000000 --- a/vendor/hyper/src/proto/h2/server.rs +++ /dev/null @@ -1,545 +0,0 @@ -use std::error::Error as StdError; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use bytes::Bytes; -use futures_util::ready; -use h2::server::{Connection, Handshake, SendResponse}; -use h2::{Reason, RecvStream}; -use http::{Method, Request}; -use pin_project_lite::pin_project; - -use super::{ping, PipeToSendStream, SendBuf}; -use crate::body::{Body, Incoming as IncomingBody}; -use crate::common::date; -use crate::common::io::Compat; -use crate::common::time::Time; -use crate::ext::Protocol; -use crate::headers; -use crate::proto::h2::ping::Recorder; -use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; -use crate::proto::Dispatched; -use crate::rt::bounds::Http2ServerConnExec; -use crate::rt::{Read, Write}; -use crate::service::HttpService; - -use crate::upgrade::{OnUpgrade, Pending, Upgraded}; -use crate::Response; - -// Our defaults are chosen for the "majority" case, which usually are not -// resource constrained, and so the spec default of 64kb can be too limiting -// for performance. -// -// At the same time, a server more often has multiple clients connected, and -// so is more likely to use more resources than a client would. -const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb -const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb -const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb -const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb -const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb -const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024; - -#[derive(Clone, Debug)] -pub(crate) struct Config { - pub(crate) adaptive_window: bool, - pub(crate) initial_conn_window_size: u32, - pub(crate) initial_stream_window_size: u32, - pub(crate) max_frame_size: u32, - pub(crate) enable_connect_protocol: bool, - pub(crate) max_concurrent_streams: Option<u32>, - pub(crate) max_pending_accept_reset_streams: Option<usize>, - pub(crate) max_local_error_reset_streams: Option<usize>, - pub(crate) keep_alive_interval: Option<Duration>, - pub(crate) keep_alive_timeout: Duration, - pub(crate) max_send_buffer_size: usize, - pub(crate) max_header_list_size: u32, - pub(crate) date_header: bool, -} - -impl Default for Config { - fn default() -> Config { - Config { - adaptive_window: false, - initial_conn_window_size: DEFAULT_CONN_WINDOW, - initial_stream_window_size: DEFAULT_STREAM_WINDOW, - max_frame_size: DEFAULT_MAX_FRAME_SIZE, - enable_connect_protocol: false, - max_concurrent_streams: Some(200), - max_pending_accept_reset_streams: None, - max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS), - keep_alive_interval: None, - keep_alive_timeout: Duration::from_secs(20), - max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, - max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, - date_header: true, - } - } -} - -pin_project! { - pub(crate) struct Server<T, S, B, E> - where - S: HttpService<IncomingBody>, - B: Body, - { - exec: E, - timer: Time, - service: S, - state: State<T, B>, - date_header: bool, - close_pending: bool - } -} - -enum State<T, B> -where - B: Body, -{ - Handshaking { - ping_config: ping::Config, - hs: Handshake<Compat<T>, SendBuf<B::Data>>, - }, - Serving(Serving<T, B>), -} - -struct Serving<T, B> -where - B: Body, -{ - ping: Option<(ping::Recorder, ping::Ponger)>, - conn: Connection<Compat<T>, SendBuf<B::Data>>, - closing: Option<crate::Error>, - date_header: bool, -} - -impl<T, S, B, E> Server<T, S, B, E> -where - T: Read + Write + Unpin, - S: HttpService<IncomingBody, ResBody = B>, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - E: Http2ServerConnExec<S::Future, B>, -{ - pub(crate) fn new( - io: T, - service: S, - config: &Config, - exec: E, - timer: Time, - ) -> Server<T, S, B, E> { - let mut builder = h2::server::Builder::default(); - builder - .initial_window_size(config.initial_stream_window_size) - .initial_connection_window_size(config.initial_conn_window_size) - .max_frame_size(config.max_frame_size) - .max_header_list_size(config.max_header_list_size) - .max_local_error_reset_streams(config.max_local_error_reset_streams) - .max_send_buffer_size(config.max_send_buffer_size); - if let Some(max) = config.max_concurrent_streams { - builder.max_concurrent_streams(max); - } - if let Some(max) = config.max_pending_accept_reset_streams { - builder.max_pending_accept_reset_streams(max); - } - if config.enable_connect_protocol { - builder.enable_connect_protocol(); - } - let handshake = builder.handshake(Compat::new(io)); - - let bdp = if config.adaptive_window { - Some(config.initial_stream_window_size) - } else { - None - }; - - let ping_config = ping::Config { - bdp_initial_window: bdp, - keep_alive_interval: config.keep_alive_interval, - keep_alive_timeout: config.keep_alive_timeout, - // If keep-alive is enabled for servers, always enabled while - // idle, so it can more aggressively close dead connections. - keep_alive_while_idle: true, - }; - - Server { - exec, - timer, - state: State::Handshaking { - ping_config, - hs: handshake, - }, - service, - date_header: config.date_header, - close_pending: false, - } - } - - pub(crate) fn graceful_shutdown(&mut self) { - trace!("graceful_shutdown"); - match self.state { - State::Handshaking { .. } => { - self.close_pending = true; - } - State::Serving(ref mut srv) => { - if srv.closing.is_none() { - srv.conn.graceful_shutdown(); - } - } - } - } -} - -impl<T, S, B, E> Future for Server<T, S, B, E> -where - T: Read + Write + Unpin, - S: HttpService<IncomingBody, ResBody = B>, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - B: Body + 'static, - E: Http2ServerConnExec<S::Future, B>, -{ - type Output = crate::Result<Dispatched>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let me = &mut *self; - loop { - let next = match me.state { - State::Handshaking { - ref mut hs, - ref ping_config, - } => { - let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; - let ping = if ping_config.is_enabled() { - let pp = conn.ping_pong().expect("conn.ping_pong"); - Some(ping::channel(pp, ping_config.clone(), me.timer.clone())) - } else { - None - }; - State::Serving(Serving { - ping, - conn, - closing: None, - date_header: me.date_header, - }) - } - State::Serving(ref mut srv) => { - // graceful_shutdown was called before handshaking finished, - if me.close_pending && srv.closing.is_none() { - srv.conn.graceful_shutdown(); - } - ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; - return Poll::Ready(Ok(Dispatched::Shutdown)); - } - }; - me.state = next; - } - } -} - -impl<T, B> Serving<T, B> -where - T: Read + Write + Unpin, - B: Body + 'static, -{ - fn poll_server<S, E>( - &mut self, - cx: &mut Context<'_>, - service: &mut S, - exec: &mut E, - ) -> Poll<crate::Result<()>> - where - S: HttpService<IncomingBody, ResBody = B>, - S::Error: Into<Box<dyn StdError + Send + Sync>>, - E: Http2ServerConnExec<S::Future, B>, - { - if self.closing.is_none() { - loop { - self.poll_ping(cx); - - match ready!(self.conn.poll_accept(cx)) { - Some(Ok((req, mut respond))) => { - trace!("incoming request"); - let content_length = headers::content_length_parse_all(req.headers()); - let ping = self - .ping - .as_ref() - .map(|ping| ping.0.clone()) - .unwrap_or_else(ping::disabled); - - // Record the headers received - ping.record_non_data(); - - let is_connect = req.method() == Method::CONNECT; - let (mut parts, stream) = req.into_parts(); - let (mut req, connect_parts) = if !is_connect { - ( - Request::from_parts( - parts, - IncomingBody::h2(stream, content_length.into(), ping), - ), - None, - ) - } else { - if content_length.map_or(false, |len| len != 0) { - warn!("h2 connect request with non-zero body not supported"); - respond.send_reset(h2::Reason::INTERNAL_ERROR); - return Poll::Ready(Ok(())); - } - let (pending, upgrade) = crate::upgrade::pending(); - debug_assert!(parts.extensions.get::<OnUpgrade>().is_none()); - parts.extensions.insert(upgrade); - ( - Request::from_parts(parts, IncomingBody::empty()), - Some(ConnectParts { - pending, - ping, - recv_stream: stream, - }), - ) - }; - - if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() { - req.extensions_mut().insert(Protocol::from_inner(protocol)); - } - - let fut = H2Stream::new( - service.call(req), - connect_parts, - respond, - self.date_header, - ); - - exec.execute_h2stream(fut); - } - Some(Err(e)) => { - return Poll::Ready(Err(crate::Error::new_h2(e))); - } - None => { - // no more incoming streams... - if let Some((ref ping, _)) = self.ping { - ping.ensure_not_timed_out()?; - } - - trace!("incoming connection complete"); - return Poll::Ready(Ok(())); - } - } - } - } - - debug_assert!( - self.closing.is_some(), - "poll_server broke loop without closing" - ); - - ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; - - Poll::Ready(Err(self.closing.take().expect("polled after error"))) - } - - fn poll_ping(&mut self, cx: &mut Context<'_>) { - if let Some((_, ref mut estimator)) = self.ping { - match estimator.poll(cx) { - Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { - self.conn.set_target_window_size(wnd); - let _ = self.conn.set_initial_window_size(wnd); - } - Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { - debug!("keep-alive timed out, closing connection"); - self.conn.abrupt_shutdown(h2::Reason::NO_ERROR); - } - Poll::Pending => {} - } - } - } -} - -pin_project! { - #[allow(missing_debug_implementations)] - pub struct H2Stream<F, B> - where - B: Body, - { - reply: SendResponse<SendBuf<B::Data>>, - #[pin] - state: H2StreamState<F, B>, - date_header: bool, - } -} - -pin_project! { - #[project = H2StreamStateProj] - enum H2StreamState<F, B> - where - B: Body, - { - Service { - #[pin] - fut: F, - connect_parts: Option<ConnectParts>, - }, - Body { - #[pin] - pipe: PipeToSendStream<B>, - }, - } -} - -struct ConnectParts { - pending: Pending, - ping: Recorder, - recv_stream: RecvStream, -} - -impl<F, B> H2Stream<F, B> -where - B: Body, -{ - fn new( - fut: F, - connect_parts: Option<ConnectParts>, - respond: SendResponse<SendBuf<B::Data>>, - date_header: bool, - ) -> H2Stream<F, B> { - H2Stream { - reply: respond, - state: H2StreamState::Service { fut, connect_parts }, - date_header, - } - } -} - -macro_rules! reply { - ($me:expr, $res:expr, $eos:expr) => {{ - match $me.reply.send_response($res, $eos) { - Ok(tx) => tx, - Err(e) => { - debug!("send response error: {}", e); - $me.reply.send_reset(Reason::INTERNAL_ERROR); - return Poll::Ready(Err(crate::Error::new_h2(e))); - } - } - }}; -} - -impl<F, B, E> H2Stream<F, B> -where - F: Future<Output = Result<Response<B>, E>>, - B: Body, - B::Data: 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - E: Into<Box<dyn StdError + Send + Sync>>, -{ - fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { - let mut me = self.project(); - loop { - let next = match me.state.as_mut().project() { - H2StreamStateProj::Service { - fut: h, - connect_parts, - } => { - let res = match h.poll(cx) { - Poll::Ready(Ok(r)) => r, - Poll::Pending => { - // Response is not yet ready, so we want to check if the client has sent a - // RST_STREAM frame which would cancel the current request. - if let Poll::Ready(reason) = - me.reply.poll_reset(cx).map_err(crate::Error::new_h2)? - { - debug!("stream received RST_STREAM: {:?}", reason); - return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); - } - return Poll::Pending; - } - Poll::Ready(Err(e)) => { - let err = crate::Error::new_user_service(e); - warn!("http2 service errored: {}", err); - me.reply.send_reset(err.h2_reason()); - return Poll::Ready(Err(err)); - } - }; - - let (head, body) = res.into_parts(); - let mut res = ::http::Response::from_parts(head, ()); - super::strip_connection_headers(res.headers_mut(), false); - - // set Date header if it isn't already set if instructed - if *me.date_header { - res.headers_mut() - .entry(::http::header::DATE) - .or_insert_with(date::update_and_header_value); - } - - if let Some(connect_parts) = connect_parts.take() { - if res.status().is_success() { - if headers::content_length_parse_all(res.headers()) - .map_or(false, |len| len != 0) - { - warn!("h2 successful response to CONNECT request with body not supported"); - me.reply.send_reset(h2::Reason::INTERNAL_ERROR); - return Poll::Ready(Err(crate::Error::new_user_header())); - } - if res - .headers_mut() - .remove(::http::header::CONTENT_LENGTH) - .is_some() - { - warn!("successful response to CONNECT request disallows content-length header"); - } - let send_stream = reply!(me, res, false); - connect_parts.pending.fulfill(Upgraded::new( - H2Upgraded { - ping: connect_parts.ping, - recv_stream: connect_parts.recv_stream, - send_stream: unsafe { UpgradedSendStream::new(send_stream) }, - buf: Bytes::new(), - }, - Bytes::new(), - )); - return Poll::Ready(Ok(())); - } - } - - if !body.is_end_stream() { - // automatically set Content-Length from body... - if let Some(len) = body.size_hint().exact() { - headers::set_content_length_if_missing(res.headers_mut(), len); - } - - let body_tx = reply!(me, res, false); - H2StreamState::Body { - pipe: PipeToSendStream::new(body, body_tx), - } - } else { - reply!(me, res, true); - return Poll::Ready(Ok(())); - } - } - H2StreamStateProj::Body { pipe } => { - return pipe.poll(cx); - } - }; - me.state.set(next); - } - } -} - -impl<F, B, E> Future for H2Stream<F, B> -where - F: Future<Output = Result<Response<B>, E>>, - B: Body, - B::Data: 'static, - B::Error: Into<Box<dyn StdError + Send + Sync>>, - E: Into<Box<dyn StdError + Send + Sync>>, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - self.poll2(cx).map(|res| { - if let Err(_e) = res { - debug!("stream error: {}", _e); - } - }) - } -} |
