summaryrefslogtreecommitdiff
path: root/vendor/hyper/src/proto/h2
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-15 16:37:08 -0600
committermo khan <mo@mokhan.ca>2025-07-17 16:30:22 -0600
commit45df4d0d9b577fecee798d672695fe24ff57fb1b (patch)
tree1b99bf645035b58e0d6db08c7a83521f41f7a75b /vendor/hyper/src/proto/h2
parentf94f79608393d4ab127db63cc41668445ef6b243 (diff)
feat: migrate from Cedar to SpiceDB authorization system
This is a major architectural change that replaces the Cedar policy-based authorization system with SpiceDB's relation-based authorization. Key changes: - Migrate from Rust to Go implementation - Replace Cedar policies with SpiceDB schema and relationships - Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks - Update build system and dependencies for Go ecosystem - Maintain Envoy integration for external authorization This change enables more flexible permission modeling through SpiceDB's Google Zanzibar inspired relation-based system, supporting complex hierarchical permissions that were difficult to express in Cedar. Breaking change: Existing Cedar policies and Rust-based configuration will no longer work and need to be migrated to SpiceDB schema.
Diffstat (limited to 'vendor/hyper/src/proto/h2')
-rw-r--r--vendor/hyper/src/proto/h2/client.rs749
-rw-r--r--vendor/hyper/src/proto/h2/mod.rs446
-rw-r--r--vendor/hyper/src/proto/h2/ping.rs509
-rw-r--r--vendor/hyper/src/proto/h2/server.rs545
4 files changed, 0 insertions, 2249 deletions
diff --git a/vendor/hyper/src/proto/h2/client.rs b/vendor/hyper/src/proto/h2/client.rs
deleted file mode 100644
index 5e9641e4..00000000
--- a/vendor/hyper/src/proto/h2/client.rs
+++ /dev/null
@@ -1,749 +0,0 @@
-use std::{
- convert::Infallible,
- future::Future,
- marker::PhantomData,
- pin::Pin,
- task::{Context, Poll},
- time::Duration,
-};
-
-use crate::rt::{Read, Write};
-use bytes::Bytes;
-use futures_channel::mpsc::{Receiver, Sender};
-use futures_channel::{mpsc, oneshot};
-use futures_util::future::{Either, FusedFuture, FutureExt as _};
-use futures_util::ready;
-use futures_util::stream::{StreamExt as _, StreamFuture};
-use h2::client::{Builder, Connection, SendRequest};
-use h2::SendStream;
-use http::{Method, StatusCode};
-use pin_project_lite::pin_project;
-
-use super::ping::{Ponger, Recorder};
-use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
-use crate::body::{Body, Incoming as IncomingBody};
-use crate::client::dispatch::{Callback, SendWhen, TrySendError};
-use crate::common::io::Compat;
-use crate::common::time::Time;
-use crate::ext::Protocol;
-use crate::headers;
-use crate::proto::h2::UpgradedSendStream;
-use crate::proto::Dispatched;
-use crate::rt::bounds::Http2ClientConnExec;
-use crate::upgrade::Upgraded;
-use crate::{Request, Response};
-use h2::client::ResponseFuture;
-
-type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
-
-///// An mpsc channel is used to help notify the `Connection` task when *all*
-///// other handles to it have been dropped, so that it can shutdown.
-type ConnDropRef = mpsc::Sender<Infallible>;
-
-///// A oneshot channel watches the `Connection` task, and when it completes,
-///// the "dispatch" task will be notified and can shutdown sooner.
-type ConnEof = oneshot::Receiver<Infallible>;
-
-// Our defaults are chosen for the "majority" case, which usually are not
-// resource constrained, and so the spec default of 64kb can be too limiting
-// for performance.
-const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
-const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
-const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
-const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
-const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
-
-// The maximum number of concurrent streams that the client is allowed to open
-// before it receives the initial SETTINGS frame from the server.
-// This default value is derived from what the HTTP/2 spec recommends as the
-// minimum value that endpoints advertise to their peers. It means that using
-// this value will minimize the chance of the failure where the local endpoint
-// attempts to open too many streams and gets rejected by the remote peer with
-// the `REFUSED_STREAM` error.
-const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
-
-#[derive(Clone, Debug)]
-pub(crate) struct Config {
- pub(crate) adaptive_window: bool,
- pub(crate) initial_conn_window_size: u32,
- pub(crate) initial_stream_window_size: u32,
- pub(crate) initial_max_send_streams: usize,
- pub(crate) max_frame_size: Option<u32>,
- pub(crate) max_header_list_size: u32,
- pub(crate) keep_alive_interval: Option<Duration>,
- pub(crate) keep_alive_timeout: Duration,
- pub(crate) keep_alive_while_idle: bool,
- pub(crate) max_concurrent_reset_streams: Option<usize>,
- pub(crate) max_send_buffer_size: usize,
- pub(crate) max_pending_accept_reset_streams: Option<usize>,
- pub(crate) header_table_size: Option<u32>,
- pub(crate) max_concurrent_streams: Option<u32>,
-}
-
-impl Default for Config {
- fn default() -> Config {
- Config {
- adaptive_window: false,
- initial_conn_window_size: DEFAULT_CONN_WINDOW,
- initial_stream_window_size: DEFAULT_STREAM_WINDOW,
- initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
- max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
- max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
- keep_alive_interval: None,
- keep_alive_timeout: Duration::from_secs(20),
- keep_alive_while_idle: false,
- max_concurrent_reset_streams: None,
- max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
- max_pending_accept_reset_streams: None,
- header_table_size: None,
- max_concurrent_streams: None,
- }
- }
-}
-
-fn new_builder(config: &Config) -> Builder {
- let mut builder = Builder::default();
- builder
- .initial_max_send_streams(config.initial_max_send_streams)
- .initial_window_size(config.initial_stream_window_size)
- .initial_connection_window_size(config.initial_conn_window_size)
- .max_header_list_size(config.max_header_list_size)
- .max_send_buffer_size(config.max_send_buffer_size)
- .enable_push(false);
- if let Some(max) = config.max_frame_size {
- builder.max_frame_size(max);
- }
- if let Some(max) = config.max_concurrent_reset_streams {
- builder.max_concurrent_reset_streams(max);
- }
- if let Some(max) = config.max_pending_accept_reset_streams {
- builder.max_pending_accept_reset_streams(max);
- }
- if let Some(size) = config.header_table_size {
- builder.header_table_size(size);
- }
- if let Some(max) = config.max_concurrent_streams {
- builder.max_concurrent_streams(max);
- }
- builder
-}
-
-fn new_ping_config(config: &Config) -> ping::Config {
- ping::Config {
- bdp_initial_window: if config.adaptive_window {
- Some(config.initial_stream_window_size)
- } else {
- None
- },
- keep_alive_interval: config.keep_alive_interval,
- keep_alive_timeout: config.keep_alive_timeout,
- keep_alive_while_idle: config.keep_alive_while_idle,
- }
-}
-
-pub(crate) async fn handshake<T, B, E>(
- io: T,
- req_rx: ClientRx<B>,
- config: &Config,
- mut exec: E,
- timer: Time,
-) -> crate::Result<ClientTask<B, E, T>>
-where
- T: Read + Write + Unpin,
- B: Body + 'static,
- B::Data: Send + 'static,
- E: Http2ClientConnExec<B, T> + Unpin,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
-{
- let (h2_tx, mut conn) = new_builder(config)
- .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
- .await
- .map_err(crate::Error::new_h2)?;
-
- // An mpsc channel is used entirely to detect when the
- // 'Client' has been dropped. This is to get around a bug
- // in h2 where dropping all SendRequests won't notify a
- // parked Connection.
- let (conn_drop_ref, rx) = mpsc::channel(1);
- let (cancel_tx, conn_eof) = oneshot::channel();
-
- let conn_drop_rx = rx.into_future();
-
- let ping_config = new_ping_config(config);
-
- let (conn, ping) = if ping_config.is_enabled() {
- let pp = conn.ping_pong().expect("conn.ping_pong");
- let (recorder, ponger) = ping::channel(pp, ping_config, timer);
-
- let conn: Conn<_, B> = Conn::new(ponger, conn);
- (Either::Left(conn), recorder)
- } else {
- (Either::Right(conn), ping::disabled())
- };
- let conn: ConnMapErr<T, B> = ConnMapErr {
- conn,
- is_terminated: false,
- };
-
- exec.execute_h2_future(H2ClientFuture::Task {
- task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
- });
-
- Ok(ClientTask {
- ping,
- conn_drop_ref,
- conn_eof,
- executor: exec,
- h2_tx,
- req_rx,
- fut_ctx: None,
- marker: PhantomData,
- })
-}
-
-pin_project! {
- struct Conn<T, B>
- where
- B: Body,
- {
- #[pin]
- ponger: Ponger,
- #[pin]
- conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
- }
-}
-
-impl<T, B> Conn<T, B>
-where
- B: Body,
- T: Read + Write + Unpin,
-{
- fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
- Conn { ponger, conn }
- }
-}
-
-impl<T, B> Future for Conn<T, B>
-where
- B: Body,
- T: Read + Write + Unpin,
-{
- type Output = Result<(), h2::Error>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut this = self.project();
- match this.ponger.poll(cx) {
- Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
- this.conn.set_target_window_size(wnd);
- this.conn.set_initial_window_size(wnd)?;
- }
- Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
- debug!("connection keep-alive timed out");
- return Poll::Ready(Ok(()));
- }
- Poll::Pending => {}
- }
-
- Pin::new(&mut this.conn).poll(cx)
- }
-}
-
-pin_project! {
- struct ConnMapErr<T, B>
- where
- B: Body,
- T: Read,
- T: Write,
- T: Unpin,
- {
- #[pin]
- conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
- #[pin]
- is_terminated: bool,
- }
-}
-
-impl<T, B> Future for ConnMapErr<T, B>
-where
- B: Body,
- T: Read + Write + Unpin,
-{
- type Output = Result<(), ()>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut this = self.project();
-
- if *this.is_terminated {
- return Poll::Pending;
- }
- let polled = this.conn.poll(cx);
- if polled.is_ready() {
- *this.is_terminated = true;
- }
- polled.map_err(|_e| {
- debug!(error = %_e, "connection error");
- })
- }
-}
-
-impl<T, B> FusedFuture for ConnMapErr<T, B>
-where
- B: Body,
- T: Read + Write + Unpin,
-{
- fn is_terminated(&self) -> bool {
- self.is_terminated
- }
-}
-
-pin_project! {
- pub struct ConnTask<T, B>
- where
- B: Body,
- T: Read,
- T: Write,
- T: Unpin,
- {
- #[pin]
- drop_rx: StreamFuture<Receiver<Infallible>>,
- #[pin]
- cancel_tx: Option<oneshot::Sender<Infallible>>,
- #[pin]
- conn: ConnMapErr<T, B>,
- }
-}
-
-impl<T, B> ConnTask<T, B>
-where
- B: Body,
- T: Read + Write + Unpin,
-{
- fn new(
- conn: ConnMapErr<T, B>,
- drop_rx: StreamFuture<Receiver<Infallible>>,
- cancel_tx: oneshot::Sender<Infallible>,
- ) -> Self {
- Self {
- drop_rx,
- cancel_tx: Some(cancel_tx),
- conn,
- }
- }
-}
-
-impl<T, B> Future for ConnTask<T, B>
-where
- B: Body,
- T: Read + Write + Unpin,
-{
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut this = self.project();
-
- if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
- // ok or err, the `conn` has finished.
- return Poll::Ready(());
- }
-
- if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
- // mpsc has been dropped, hopefully polling
- // the connection some more should start shutdown
- // and then close.
- trace!("send_request dropped, starting conn shutdown");
- drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
- }
-
- Poll::Pending
- }
-}
-
-pin_project! {
- #[project = H2ClientFutureProject]
- pub enum H2ClientFuture<B, T>
- where
- B: http_body::Body,
- B: 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- T: Read,
- T: Write,
- T: Unpin,
- {
- Pipe {
- #[pin]
- pipe: PipeMap<B>,
- },
- Send {
- #[pin]
- send_when: SendWhen<B>,
- },
- Task {
- #[pin]
- task: ConnTask<T, B>,
- },
- }
-}
-
-impl<B, T> Future for H2ClientFuture<B, T>
-where
- B: http_body::Body + 'static,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- T: Read + Write + Unpin,
-{
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
- let this = self.project();
-
- match this {
- H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
- H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
- H2ClientFutureProject::Task { task } => task.poll(cx),
- }
- }
-}
-
-struct FutCtx<B>
-where
- B: Body,
-{
- is_connect: bool,
- eos: bool,
- fut: ResponseFuture,
- body_tx: SendStream<SendBuf<B::Data>>,
- body: B,
- cb: Callback<Request<B>, Response<IncomingBody>>,
-}
-
-impl<B: Body> Unpin for FutCtx<B> {}
-
-pub(crate) struct ClientTask<B, E, T>
-where
- B: Body,
- E: Unpin,
-{
- ping: ping::Recorder,
- conn_drop_ref: ConnDropRef,
- conn_eof: ConnEof,
- executor: E,
- h2_tx: SendRequest<SendBuf<B::Data>>,
- req_rx: ClientRx<B>,
- fut_ctx: Option<FutCtx<B>>,
- marker: PhantomData<T>,
-}
-
-impl<B, E, T> ClientTask<B, E, T>
-where
- B: Body + 'static,
- E: Http2ClientConnExec<B, T> + Unpin,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- T: Read + Write + Unpin,
-{
- pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
- self.h2_tx.is_extended_connect_protocol_enabled()
- }
-}
-
-pin_project! {
- pub struct PipeMap<S>
- where
- S: Body,
- {
- #[pin]
- pipe: PipeToSendStream<S>,
- #[pin]
- conn_drop_ref: Option<Sender<Infallible>>,
- #[pin]
- ping: Option<Recorder>,
- }
-}
-
-impl<B> Future for PipeMap<B>
-where
- B: http_body::Body,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
-{
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
- let mut this = self.project();
-
- match this.pipe.poll_unpin(cx) {
- Poll::Ready(result) => {
- if let Err(_e) = result {
- debug!("client request body error: {}", _e);
- }
- drop(this.conn_drop_ref.take().expect("Future polled twice"));
- drop(this.ping.take().expect("Future polled twice"));
- return Poll::Ready(());
- }
- Poll::Pending => (),
- };
- Poll::Pending
- }
-}
-
-impl<B, E, T> ClientTask<B, E, T>
-where
- B: Body + 'static + Unpin,
- B::Data: Send,
- E: Http2ClientConnExec<B, T> + Unpin,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- T: Read + Write + Unpin,
-{
- fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
- let ping = self.ping.clone();
-
- let send_stream = if !f.is_connect {
- if !f.eos {
- let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
-
- // eagerly see if the body pipe is ready and
- // can thus skip allocating in the executor
- match Pin::new(&mut pipe).poll(cx) {
- Poll::Ready(_) => (),
- Poll::Pending => {
- let conn_drop_ref = self.conn_drop_ref.clone();
- // keep the ping recorder's knowledge of an
- // "open stream" alive while this body is
- // still sending...
- let ping = ping.clone();
-
- let pipe = PipeMap {
- pipe,
- conn_drop_ref: Some(conn_drop_ref),
- ping: Some(ping),
- };
- // Clear send task
- self.executor
- .execute_h2_future(H2ClientFuture::Pipe { pipe });
- }
- }
- }
-
- None
- } else {
- Some(f.body_tx)
- };
-
- self.executor.execute_h2_future(H2ClientFuture::Send {
- send_when: SendWhen {
- when: ResponseFutMap {
- fut: f.fut,
- ping: Some(ping),
- send_stream: Some(send_stream),
- },
- call_back: Some(f.cb),
- },
- });
- }
-}
-
-pin_project! {
- pub(crate) struct ResponseFutMap<B>
- where
- B: Body,
- B: 'static,
- {
- #[pin]
- fut: ResponseFuture,
- #[pin]
- ping: Option<Recorder>,
- #[pin]
- send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
- }
-}
-
-impl<B> Future for ResponseFutMap<B>
-where
- B: Body + 'static,
-{
- type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut this = self.project();
-
- let result = ready!(this.fut.poll(cx));
-
- let ping = this.ping.take().expect("Future polled twice");
- let send_stream = this.send_stream.take().expect("Future polled twice");
-
- match result {
- Ok(res) => {
- // record that we got the response headers
- ping.record_non_data();
-
- let content_length = headers::content_length_parse_all(res.headers());
- if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
- if content_length.map_or(false, |len| len != 0) {
- warn!("h2 connect response with non-zero body not supported");
-
- send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
- return Poll::Ready(Err((
- crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
- None::<Request<B>>,
- )));
- }
- let (parts, recv_stream) = res.into_parts();
- let mut res = Response::from_parts(parts, IncomingBody::empty());
-
- let (pending, on_upgrade) = crate::upgrade::pending();
- let io = H2Upgraded {
- ping,
- send_stream: unsafe { UpgradedSendStream::new(send_stream) },
- recv_stream,
- buf: Bytes::new(),
- };
- let upgraded = Upgraded::new(io, Bytes::new());
-
- pending.fulfill(upgraded);
- res.extensions_mut().insert(on_upgrade);
-
- Poll::Ready(Ok(res))
- } else {
- let res = res.map(|stream| {
- let ping = ping.for_stream(&stream);
- IncomingBody::h2(stream, content_length.into(), ping)
- });
- Poll::Ready(Ok(res))
- }
- }
- Err(err) => {
- ping.ensure_not_timed_out().map_err(|e| (e, None))?;
-
- debug!("client response error: {}", err);
- Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
- }
- }
- }
-}
-
-impl<B, E, T> Future for ClientTask<B, E, T>
-where
- B: Body + 'static + Unpin,
- B::Data: Send,
- B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
- E: Http2ClientConnExec<B, T> + Unpin,
- T: Read + Write + Unpin,
-{
- type Output = crate::Result<Dispatched>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- loop {
- match ready!(self.h2_tx.poll_ready(cx)) {
- Ok(()) => (),
- Err(err) => {
- self.ping.ensure_not_timed_out()?;
- return if err.reason() == Some(::h2::Reason::NO_ERROR) {
- trace!("connection gracefully shutdown");
- Poll::Ready(Ok(Dispatched::Shutdown))
- } else {
- Poll::Ready(Err(crate::Error::new_h2(err)))
- };
- }
- };
-
- // If we were waiting on pending open
- // continue where we left off.
- if let Some(f) = self.fut_ctx.take() {
- self.poll_pipe(f, cx);
- continue;
- }
-
- match self.req_rx.poll_recv(cx) {
- Poll::Ready(Some((req, cb))) => {
- // check that future hasn't been canceled already
- if cb.is_canceled() {
- trace!("request callback is canceled");
- continue;
- }
- let (head, body) = req.into_parts();
- let mut req = ::http::Request::from_parts(head, ());
- super::strip_connection_headers(req.headers_mut(), true);
- if let Some(len) = body.size_hint().exact() {
- if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
- headers::set_content_length_if_missing(req.headers_mut(), len);
- }
- }
-
- let is_connect = req.method() == Method::CONNECT;
- let eos = body.is_end_stream();
-
- if is_connect
- && headers::content_length_parse_all(req.headers())
- .map_or(false, |len| len != 0)
- {
- warn!("h2 connect request with non-zero body not supported");
- cb.send(Err(TrySendError {
- error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
- message: None,
- }));
- continue;
- }
-
- if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
- req.extensions_mut().insert(protocol.into_inner());
- }
-
- let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
- Ok(ok) => ok,
- Err(err) => {
- debug!("client send request error: {}", err);
- cb.send(Err(TrySendError {
- error: crate::Error::new_h2(err),
- message: None,
- }));
- continue;
- }
- };
-
- let f = FutCtx {
- is_connect,
- eos,
- fut,
- body_tx,
- body,
- cb,
- };
-
- // Check poll_ready() again.
- // If the call to send_request() resulted in the new stream being pending open
- // we have to wait for the open to complete before accepting new requests.
- match self.h2_tx.poll_ready(cx) {
- Poll::Pending => {
- // Save Context
- self.fut_ctx = Some(f);
- return Poll::Pending;
- }
- Poll::Ready(Ok(())) => (),
- Poll::Ready(Err(err)) => {
- f.cb.send(Err(TrySendError {
- error: crate::Error::new_h2(err),
- message: None,
- }));
- continue;
- }
- }
- self.poll_pipe(f, cx);
- continue;
- }
-
- Poll::Ready(None) => {
- trace!("client::dispatch::Sender dropped");
- return Poll::Ready(Ok(Dispatched::Shutdown));
- }
-
- Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
- // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
- // But we cannot remove it as long as MSRV is less than that.
- #[allow(unused)]
- Ok(never) => match never {},
- Err(_conn_is_eof) => {
- trace!("connection task is closed, closing dispatch task");
- return Poll::Ready(Ok(Dispatched::Shutdown));
- }
- },
- }
- }
- }
-}
diff --git a/vendor/hyper/src/proto/h2/mod.rs b/vendor/hyper/src/proto/h2/mod.rs
deleted file mode 100644
index adb6de87..00000000
--- a/vendor/hyper/src/proto/h2/mod.rs
+++ /dev/null
@@ -1,446 +0,0 @@
-use std::error::Error as StdError;
-use std::future::Future;
-use std::io::{Cursor, IoSlice};
-use std::mem;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-use bytes::{Buf, Bytes};
-use futures_util::ready;
-use h2::{Reason, RecvStream, SendStream};
-use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
-use http::HeaderMap;
-use pin_project_lite::pin_project;
-
-use crate::body::Body;
-use crate::proto::h2::ping::Recorder;
-use crate::rt::{Read, ReadBufCursor, Write};
-
-pub(crate) mod ping;
-
-cfg_client! {
- pub(crate) mod client;
- pub(crate) use self::client::ClientTask;
-}
-
-cfg_server! {
- pub(crate) mod server;
- pub(crate) use self::server::Server;
-}
-
-/// Default initial stream window size defined in HTTP2 spec.
-pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
-
-// List of connection headers from RFC 9110 Section 7.6.1
-//
-// TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
-// tested separately.
-static CONNECTION_HEADERS: [HeaderName; 4] = [
- HeaderName::from_static("keep-alive"),
- HeaderName::from_static("proxy-connection"),
- TRANSFER_ENCODING,
- UPGRADE,
-];
-
-fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
- for header in &CONNECTION_HEADERS {
- if headers.remove(header).is_some() {
- warn!("Connection header illegal in HTTP/2: {}", header.as_str());
- }
- }
-
- if is_request {
- if headers
- .get(TE)
- .map_or(false, |te_header| te_header != "trailers")
- {
- warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
- headers.remove(TE);
- }
- } else if headers.remove(TE).is_some() {
- warn!("TE headers illegal in HTTP/2 responses");
- }
-
- if let Some(header) = headers.remove(CONNECTION) {
- warn!(
- "Connection header illegal in HTTP/2: {}",
- CONNECTION.as_str()
- );
- let header_contents = header.to_str().unwrap();
-
- // A `Connection` header may have a comma-separated list of names of other headers that
- // are meant for only this specific connection.
- //
- // Iterate these names and remove them as headers. Connection-specific headers are
- // forbidden in HTTP2, as that information has been moved into frame types of the h2
- // protocol.
- for name in header_contents.split(',') {
- let name = name.trim();
- headers.remove(name);
- }
- }
-}
-
-// body adapters used by both Client and Server
-
-pin_project! {
- pub(crate) struct PipeToSendStream<S>
- where
- S: Body,
- {
- body_tx: SendStream<SendBuf<S::Data>>,
- data_done: bool,
- #[pin]
- stream: S,
- }
-}
-
-impl<S> PipeToSendStream<S>
-where
- S: Body,
-{
- fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
- PipeToSendStream {
- body_tx: tx,
- data_done: false,
- stream,
- }
- }
-}
-
-impl<S> Future for PipeToSendStream<S>
-where
- S: Body,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
-{
- type Output = crate::Result<()>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut me = self.project();
- loop {
- // we don't have the next chunk of data yet, so just reserve 1 byte to make
- // sure there's some capacity available. h2 will handle the capacity management
- // for the actual body chunk.
- me.body_tx.reserve_capacity(1);
-
- if me.body_tx.capacity() == 0 {
- loop {
- match ready!(me.body_tx.poll_capacity(cx)) {
- Some(Ok(0)) => {}
- Some(Ok(_)) => break,
- Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
- None => {
- // None means the stream is no longer in a
- // streaming state, we either finished it
- // somehow, or the remote reset us.
- return Poll::Ready(Err(crate::Error::new_body_write(
- "send stream capacity unexpectedly closed",
- )));
- }
- }
- }
- } else if let Poll::Ready(reason) = me
- .body_tx
- .poll_reset(cx)
- .map_err(crate::Error::new_body_write)?
- {
- debug!("stream received RST_STREAM: {:?}", reason);
- return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
- }
-
- match ready!(me.stream.as_mut().poll_frame(cx)) {
- Some(Ok(frame)) => {
- if frame.is_data() {
- let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
- let is_eos = me.stream.is_end_stream();
- trace!(
- "send body chunk: {} bytes, eos={}",
- chunk.remaining(),
- is_eos,
- );
-
- let buf = SendBuf::Buf(chunk);
- me.body_tx
- .send_data(buf, is_eos)
- .map_err(crate::Error::new_body_write)?;
-
- if is_eos {
- return Poll::Ready(Ok(()));
- }
- } else if frame.is_trailers() {
- // no more DATA, so give any capacity back
- me.body_tx.reserve_capacity(0);
- me.body_tx
- .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
- .map_err(crate::Error::new_body_write)?;
- return Poll::Ready(Ok(()));
- } else {
- trace!("discarding unknown frame");
- // loop again
- }
- }
- Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
- None => {
- // no more frames means we're done here
- // but at this point, we haven't sent an EOS DATA, or
- // any trailers, so send an empty EOS DATA.
- return Poll::Ready(me.body_tx.send_eos_frame());
- }
- }
- }
- }
-}
-
-trait SendStreamExt {
- fn on_user_err<E>(&mut self, err: E) -> crate::Error
- where
- E: Into<Box<dyn std::error::Error + Send + Sync>>;
- fn send_eos_frame(&mut self) -> crate::Result<()>;
-}
-
-impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
- fn on_user_err<E>(&mut self, err: E) -> crate::Error
- where
- E: Into<Box<dyn std::error::Error + Send + Sync>>,
- {
- let err = crate::Error::new_user_body(err);
- debug!("send body user stream error: {}", err);
- self.send_reset(err.h2_reason());
- err
- }
-
- fn send_eos_frame(&mut self) -> crate::Result<()> {
- trace!("send body eos");
- self.send_data(SendBuf::None, true)
- .map_err(crate::Error::new_body_write)
- }
-}
-
-#[repr(usize)]
-enum SendBuf<B> {
- Buf(B),
- Cursor(Cursor<Box<[u8]>>),
- None,
-}
-
-impl<B: Buf> Buf for SendBuf<B> {
- #[inline]
- fn remaining(&self) -> usize {
- match *self {
- Self::Buf(ref b) => b.remaining(),
- Self::Cursor(ref c) => Buf::remaining(c),
- Self::None => 0,
- }
- }
-
- #[inline]
- fn chunk(&self) -> &[u8] {
- match *self {
- Self::Buf(ref b) => b.chunk(),
- Self::Cursor(ref c) => c.chunk(),
- Self::None => &[],
- }
- }
-
- #[inline]
- fn advance(&mut self, cnt: usize) {
- match *self {
- Self::Buf(ref mut b) => b.advance(cnt),
- Self::Cursor(ref mut c) => c.advance(cnt),
- Self::None => {}
- }
- }
-
- fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
- match *self {
- Self::Buf(ref b) => b.chunks_vectored(dst),
- Self::Cursor(ref c) => c.chunks_vectored(dst),
- Self::None => 0,
- }
- }
-}
-
-struct H2Upgraded<B>
-where
- B: Buf,
-{
- ping: Recorder,
- send_stream: UpgradedSendStream<B>,
- recv_stream: RecvStream,
- buf: Bytes,
-}
-
-impl<B> Read for H2Upgraded<B>
-where
- B: Buf,
-{
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- mut read_buf: ReadBufCursor<'_>,
- ) -> Poll<Result<(), std::io::Error>> {
- if self.buf.is_empty() {
- self.buf = loop {
- match ready!(self.recv_stream.poll_data(cx)) {
- None => return Poll::Ready(Ok(())),
- Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
- continue
- }
- Some(Ok(buf)) => {
- self.ping.record_data(buf.len());
- break buf;
- }
- Some(Err(e)) => {
- return Poll::Ready(match e.reason() {
- Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
- Some(Reason::STREAM_CLOSED) => {
- Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
- }
- _ => Err(h2_to_io_error(e)),
- })
- }
- }
- };
- }
- let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
- read_buf.put_slice(&self.buf[..cnt]);
- self.buf.advance(cnt);
- let _ = self.recv_stream.flow_control().release_capacity(cnt);
- Poll::Ready(Ok(()))
- }
-}
-
-impl<B> Write for H2Upgraded<B>
-where
- B: Buf,
-{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, std::io::Error>> {
- if buf.is_empty() {
- return Poll::Ready(Ok(0));
- }
- self.send_stream.reserve_capacity(buf.len());
-
- // We ignore all errors returned by `poll_capacity` and `write`, as we
- // will get the correct from `poll_reset` anyway.
- let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
- None => Some(0),
- Some(Ok(cnt)) => self
- .send_stream
- .write(&buf[..cnt], false)
- .ok()
- .map(|()| cnt),
- Some(Err(_)) => None,
- };
-
- if let Some(cnt) = cnt {
- return Poll::Ready(Ok(cnt));
- }
-
- Poll::Ready(Err(h2_to_io_error(
- match ready!(self.send_stream.poll_reset(cx)) {
- Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
- return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
- }
- Ok(reason) => reason.into(),
- Err(e) => e,
- },
- )))
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), std::io::Error>> {
- if self.send_stream.write(&[], true).is_ok() {
- return Poll::Ready(Ok(()));
- }
-
- Poll::Ready(Err(h2_to_io_error(
- match ready!(self.send_stream.poll_reset(cx)) {
- Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
- Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
- return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
- }
- Ok(reason) => reason.into(),
- Err(e) => e,
- },
- )))
- }
-}
-
-fn h2_to_io_error(e: h2::Error) -> std::io::Error {
- if e.is_io() {
- e.into_io().unwrap()
- } else {
- std::io::Error::new(std::io::ErrorKind::Other, e)
- }
-}
-
-struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
-
-impl<B> UpgradedSendStream<B>
-where
- B: Buf,
-{
- unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
- assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
- Self(mem::transmute(inner))
- }
-
- fn reserve_capacity(&mut self, cnt: usize) {
- unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
- }
-
- fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
- unsafe { self.as_inner_unchecked().poll_capacity(cx) }
- }
-
- fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
- unsafe { self.as_inner_unchecked().poll_reset(cx) }
- }
-
- fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> {
- let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
- unsafe {
- self.as_inner_unchecked()
- .send_data(send_buf, end_of_stream)
- .map_err(h2_to_io_error)
- }
- }
-
- unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
- &mut *(&mut self.0 as *mut _ as *mut _)
- }
-}
-
-#[repr(transparent)]
-struct Neutered<B> {
- _inner: B,
- impossible: Impossible,
-}
-
-enum Impossible {}
-
-unsafe impl<B> Send for Neutered<B> {}
-
-impl<B> Buf for Neutered<B> {
- fn remaining(&self) -> usize {
- match self.impossible {}
- }
-
- fn chunk(&self) -> &[u8] {
- match self.impossible {}
- }
-
- fn advance(&mut self, _cnt: usize) {
- match self.impossible {}
- }
-}
diff --git a/vendor/hyper/src/proto/h2/ping.rs b/vendor/hyper/src/proto/h2/ping.rs
deleted file mode 100644
index 749cf1b7..00000000
--- a/vendor/hyper/src/proto/h2/ping.rs
+++ /dev/null
@@ -1,509 +0,0 @@
-/// HTTP2 Ping usage
-///
-/// hyper uses HTTP2 pings for two purposes:
-///
-/// 1. Adaptive flow control using BDP
-/// 2. Connection keep-alive
-///
-/// Both cases are optional.
-///
-/// # BDP Algorithm
-///
-/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
-/// 1a. Record current time.
-/// 1b. Send a BDP ping.
-/// 2. Increment the number of received bytes.
-/// 3. When the BDP ping ack is received:
-/// 3a. Record duration from sent time.
-/// 3b. Merge RTT with a running average.
-/// 3c. Calculate bdp as bytes/rtt.
-/// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
-use std::fmt;
-use std::future::Future;
-use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::task::{self, Poll};
-use std::time::{Duration, Instant};
-
-use h2::{Ping, PingPong};
-
-use crate::common::time::Time;
-use crate::rt::Sleep;
-
-type WindowSize = u32;
-
-pub(super) fn disabled() -> Recorder {
- Recorder { shared: None }
-}
-
-pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) {
- debug_assert!(
- config.is_enabled(),
- "ping channel requires bdp or keep-alive config",
- );
-
- let bdp = config.bdp_initial_window.map(|wnd| Bdp {
- bdp: wnd,
- max_bandwidth: 0.0,
- rtt: 0.0,
- ping_delay: Duration::from_millis(100),
- stable_count: 0,
- });
-
- let (bytes, next_bdp_at) = if bdp.is_some() {
- (Some(0), Some(Instant::now()))
- } else {
- (None, None)
- };
-
- let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
- interval,
- timeout: config.keep_alive_timeout,
- while_idle: config.keep_alive_while_idle,
- sleep: __timer.sleep(interval),
- state: KeepAliveState::Init,
- timer: __timer,
- });
-
- let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
-
- let shared = Arc::new(Mutex::new(Shared {
- bytes,
- last_read_at,
- is_keep_alive_timed_out: false,
- ping_pong,
- ping_sent_at: None,
- next_bdp_at,
- }));
-
- (
- Recorder {
- shared: Some(shared.clone()),
- },
- Ponger {
- bdp,
- keep_alive,
- shared,
- },
- )
-}
-
-#[derive(Clone)]
-pub(super) struct Config {
- pub(super) bdp_initial_window: Option<WindowSize>,
- /// If no frames are received in this amount of time, a PING frame is sent.
- pub(super) keep_alive_interval: Option<Duration>,
- /// After sending a keepalive PING, the connection will be closed if
- /// a pong is not received in this amount of time.
- pub(super) keep_alive_timeout: Duration,
- /// If true, sends pings even when there are no active streams.
- pub(super) keep_alive_while_idle: bool,
-}
-
-#[derive(Clone)]
-pub(crate) struct Recorder {
- shared: Option<Arc<Mutex<Shared>>>,
-}
-
-pub(super) struct Ponger {
- bdp: Option<Bdp>,
- keep_alive: Option<KeepAlive>,
- shared: Arc<Mutex<Shared>>,
-}
-
-struct Shared {
- ping_pong: PingPong,
- ping_sent_at: Option<Instant>,
-
- // bdp
- /// If `Some`, bdp is enabled, and this tracks how many bytes have been
- /// read during the current sample.
- bytes: Option<usize>,
- /// We delay a variable amount of time between BDP pings. This allows us
- /// to send less pings as the bandwidth stabilizes.
- next_bdp_at: Option<Instant>,
-
- // keep-alive
- /// If `Some`, keep-alive is enabled, and the Instant is how long ago
- /// the connection read the last frame.
- last_read_at: Option<Instant>,
-
- is_keep_alive_timed_out: bool,
-}
-
-struct Bdp {
- /// Current BDP in bytes
- bdp: u32,
- /// Largest bandwidth we've seen so far.
- max_bandwidth: f64,
- /// Round trip time in seconds
- rtt: f64,
- /// Delay the next ping by this amount.
- ///
- /// This will change depending on how stable the current bandwidth is.
- ping_delay: Duration,
- /// The count of ping round trips where BDP has stayed the same.
- stable_count: u32,
-}
-
-struct KeepAlive {
- /// If no frames are received in this amount of time, a PING frame is sent.
- interval: Duration,
- /// After sending a keepalive PING, the connection will be closed if
- /// a pong is not received in this amount of time.
- timeout: Duration,
- /// If true, sends pings even when there are no active streams.
- while_idle: bool,
- state: KeepAliveState,
- sleep: Pin<Box<dyn Sleep>>,
- timer: Time,
-}
-
-enum KeepAliveState {
- Init,
- Scheduled(Instant),
- PingSent,
-}
-
-pub(super) enum Ponged {
- SizeUpdate(WindowSize),
- KeepAliveTimedOut,
-}
-
-#[derive(Debug)]
-pub(super) struct KeepAliveTimedOut;
-
-// ===== impl Config =====
-
-impl Config {
- pub(super) fn is_enabled(&self) -> bool {
- self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
- }
-}
-
-// ===== impl Recorder =====
-
-impl Recorder {
- pub(crate) fn record_data(&self, len: usize) {
- let shared = if let Some(ref shared) = self.shared {
- shared
- } else {
- return;
- };
-
- let mut locked = shared.lock().unwrap();
-
- locked.update_last_read_at();
-
- // are we ready to send another bdp ping?
- // if not, we don't need to record bytes either
-
- if let Some(ref next_bdp_at) = locked.next_bdp_at {
- if Instant::now() < *next_bdp_at {
- return;
- } else {
- locked.next_bdp_at = None;
- }
- }
-
- if let Some(ref mut bytes) = locked.bytes {
- *bytes += len;
- } else {
- // no need to send bdp ping if bdp is disabled
- return;
- }
-
- if !locked.is_ping_sent() {
- locked.send_ping();
- }
- }
-
- pub(crate) fn record_non_data(&self) {
- let shared = if let Some(ref shared) = self.shared {
- shared
- } else {
- return;
- };
-
- let mut locked = shared.lock().unwrap();
-
- locked.update_last_read_at();
- }
-
- /// If the incoming stream is already closed, convert self into
- /// a disabled reporter.
- #[cfg(feature = "client")]
- pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
- if stream.is_end_stream() {
- disabled()
- } else {
- self
- }
- }
-
- pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
- if let Some(ref shared) = self.shared {
- let locked = shared.lock().unwrap();
- if locked.is_keep_alive_timed_out {
- return Err(KeepAliveTimedOut.crate_error());
- }
- }
-
- // else
- Ok(())
- }
-}
-
-// ===== impl Ponger =====
-
-impl Ponger {
- pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
- let now = Instant::now();
- let mut locked = self.shared.lock().unwrap();
- let is_idle = self.is_idle();
-
- if let Some(ref mut ka) = self.keep_alive {
- ka.maybe_schedule(is_idle, &locked);
- ka.maybe_ping(cx, is_idle, &mut locked);
- }
-
- if !locked.is_ping_sent() {
- // XXX: this doesn't register a waker...?
- return Poll::Pending;
- }
-
- match locked.ping_pong.poll_pong(cx) {
- Poll::Ready(Ok(_pong)) => {
- let start = locked
- .ping_sent_at
- .expect("pong received implies ping_sent_at");
- locked.ping_sent_at = None;
- let rtt = now - start;
- trace!("recv pong");
-
- if let Some(ref mut ka) = self.keep_alive {
- locked.update_last_read_at();
- ka.maybe_schedule(is_idle, &locked);
- ka.maybe_ping(cx, is_idle, &mut locked);
- }
-
- if let Some(ref mut bdp) = self.bdp {
- let bytes = locked.bytes.expect("bdp enabled implies bytes");
- locked.bytes = Some(0); // reset
- trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
-
- let update = bdp.calculate(bytes, rtt);
- locked.next_bdp_at = Some(now + bdp.ping_delay);
- if let Some(update) = update {
- return Poll::Ready(Ponged::SizeUpdate(update));
- }
- }
- }
- Poll::Ready(Err(_e)) => {
- debug!("pong error: {}", _e);
- }
- Poll::Pending => {
- if let Some(ref mut ka) = self.keep_alive {
- if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
- self.keep_alive = None;
- locked.is_keep_alive_timed_out = true;
- return Poll::Ready(Ponged::KeepAliveTimedOut);
- }
- }
- }
- }
-
- // XXX: this doesn't register a waker...?
- Poll::Pending
- }
-
- fn is_idle(&self) -> bool {
- Arc::strong_count(&self.shared) <= 2
- }
-}
-
-// ===== impl Shared =====
-
-impl Shared {
- fn send_ping(&mut self) {
- match self.ping_pong.send_ping(Ping::opaque()) {
- Ok(()) => {
- self.ping_sent_at = Some(Instant::now());
- trace!("sent ping");
- }
- Err(_err) => {
- debug!("error sending ping: {}", _err);
- }
- }
- }
-
- fn is_ping_sent(&self) -> bool {
- self.ping_sent_at.is_some()
- }
-
- fn update_last_read_at(&mut self) {
- if self.last_read_at.is_some() {
- self.last_read_at = Some(Instant::now());
- }
- }
-
- fn last_read_at(&self) -> Instant {
- self.last_read_at.expect("keep_alive expects last_read_at")
- }
-}
-
-// ===== impl Bdp =====
-
-/// Any higher than this likely will be hitting the TCP flow control.
-const BDP_LIMIT: usize = 1024 * 1024 * 16;
-
-impl Bdp {
- fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
- // No need to do any math if we're at the limit.
- if self.bdp as usize == BDP_LIMIT {
- self.stabilize_delay();
- return None;
- }
-
- // average the rtt
- let rtt = seconds(rtt);
- if self.rtt == 0.0 {
- // First sample means rtt is first rtt.
- self.rtt = rtt;
- } else {
- // Weigh this rtt as 1/8 for a moving average.
- self.rtt += (rtt - self.rtt) * 0.125;
- }
-
- // calculate the current bandwidth
- let bw = (bytes as f64) / (self.rtt * 1.5);
- trace!("current bandwidth = {:.1}B/s", bw);
-
- if bw < self.max_bandwidth {
- // not a faster bandwidth, so don't update
- self.stabilize_delay();
- return None;
- } else {
- self.max_bandwidth = bw;
- }
-
- // if the current `bytes` sample is at least 2/3 the previous
- // bdp, increase to double the current sample.
- if bytes >= self.bdp as usize * 2 / 3 {
- self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
- trace!("BDP increased to {}", self.bdp);
-
- self.stable_count = 0;
- self.ping_delay /= 2;
- Some(self.bdp)
- } else {
- self.stabilize_delay();
- None
- }
- }
-
- fn stabilize_delay(&mut self) {
- if self.ping_delay < Duration::from_secs(10) {
- self.stable_count += 1;
-
- if self.stable_count >= 2 {
- self.ping_delay *= 4;
- self.stable_count = 0;
- }
- }
- }
-}
-
-fn seconds(dur: Duration) -> f64 {
- const NANOS_PER_SEC: f64 = 1_000_000_000.0;
- let secs = dur.as_secs() as f64;
- secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
-}
-
-// ===== impl KeepAlive =====
-
-impl KeepAlive {
- fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) {
- match self.state {
- KeepAliveState::Init => {
- if !self.while_idle && is_idle {
- return;
- }
-
- self.schedule(shared);
- }
- KeepAliveState::PingSent => {
- if shared.is_ping_sent() {
- return;
- }
- self.schedule(shared);
- }
- KeepAliveState::Scheduled(..) => (),
- }
- }
-
- fn schedule(&mut self, shared: &Shared) {
- let interval = shared.last_read_at() + self.interval;
- self.state = KeepAliveState::Scheduled(interval);
- self.timer.reset(&mut self.sleep, interval);
- }
-
- fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) {
- match self.state {
- KeepAliveState::Scheduled(at) => {
- if Pin::new(&mut self.sleep).poll(cx).is_pending() {
- return;
- }
- // check if we've received a frame while we were scheduled
- if shared.last_read_at() + self.interval > at {
- self.state = KeepAliveState::Init;
- cx.waker().wake_by_ref(); // schedule us again
- return;
- }
- if !self.while_idle && is_idle {
- trace!("keep-alive no need to ping when idle and while_idle=false");
- return;
- }
- trace!("keep-alive interval ({:?}) reached", self.interval);
- shared.send_ping();
- self.state = KeepAliveState::PingSent;
- let timeout = Instant::now() + self.timeout;
- self.timer.reset(&mut self.sleep, timeout);
- }
- KeepAliveState::Init | KeepAliveState::PingSent => (),
- }
- }
-
- fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
- match self.state {
- KeepAliveState::PingSent => {
- if Pin::new(&mut self.sleep).poll(cx).is_pending() {
- return Ok(());
- }
- trace!("keep-alive timeout ({:?}) reached", self.timeout);
- Err(KeepAliveTimedOut)
- }
- KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()),
- }
- }
-}
-
-// ===== impl KeepAliveTimedOut =====
-
-impl KeepAliveTimedOut {
- pub(super) fn crate_error(self) -> crate::Error {
- crate::Error::new(crate::error::Kind::Http2).with(self)
- }
-}
-
-impl fmt::Display for KeepAliveTimedOut {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.write_str("keep-alive timed out")
- }
-}
-
-impl std::error::Error for KeepAliveTimedOut {
- fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
- Some(&crate::error::TimedOut)
- }
-}
diff --git a/vendor/hyper/src/proto/h2/server.rs b/vendor/hyper/src/proto/h2/server.rs
deleted file mode 100644
index a8a20dd6..00000000
--- a/vendor/hyper/src/proto/h2/server.rs
+++ /dev/null
@@ -1,545 +0,0 @@
-use std::error::Error as StdError;
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::time::Duration;
-
-use bytes::Bytes;
-use futures_util::ready;
-use h2::server::{Connection, Handshake, SendResponse};
-use h2::{Reason, RecvStream};
-use http::{Method, Request};
-use pin_project_lite::pin_project;
-
-use super::{ping, PipeToSendStream, SendBuf};
-use crate::body::{Body, Incoming as IncomingBody};
-use crate::common::date;
-use crate::common::io::Compat;
-use crate::common::time::Time;
-use crate::ext::Protocol;
-use crate::headers;
-use crate::proto::h2::ping::Recorder;
-use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
-use crate::proto::Dispatched;
-use crate::rt::bounds::Http2ServerConnExec;
-use crate::rt::{Read, Write};
-use crate::service::HttpService;
-
-use crate::upgrade::{OnUpgrade, Pending, Upgraded};
-use crate::Response;
-
-// Our defaults are chosen for the "majority" case, which usually are not
-// resource constrained, and so the spec default of 64kb can be too limiting
-// for performance.
-//
-// At the same time, a server more often has multiple clients connected, and
-// so is more likely to use more resources than a client would.
-const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
-const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
-const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
-const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
-const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
-const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
-
-#[derive(Clone, Debug)]
-pub(crate) struct Config {
- pub(crate) adaptive_window: bool,
- pub(crate) initial_conn_window_size: u32,
- pub(crate) initial_stream_window_size: u32,
- pub(crate) max_frame_size: u32,
- pub(crate) enable_connect_protocol: bool,
- pub(crate) max_concurrent_streams: Option<u32>,
- pub(crate) max_pending_accept_reset_streams: Option<usize>,
- pub(crate) max_local_error_reset_streams: Option<usize>,
- pub(crate) keep_alive_interval: Option<Duration>,
- pub(crate) keep_alive_timeout: Duration,
- pub(crate) max_send_buffer_size: usize,
- pub(crate) max_header_list_size: u32,
- pub(crate) date_header: bool,
-}
-
-impl Default for Config {
- fn default() -> Config {
- Config {
- adaptive_window: false,
- initial_conn_window_size: DEFAULT_CONN_WINDOW,
- initial_stream_window_size: DEFAULT_STREAM_WINDOW,
- max_frame_size: DEFAULT_MAX_FRAME_SIZE,
- enable_connect_protocol: false,
- max_concurrent_streams: Some(200),
- max_pending_accept_reset_streams: None,
- max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
- keep_alive_interval: None,
- keep_alive_timeout: Duration::from_secs(20),
- max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
- max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
- date_header: true,
- }
- }
-}
-
-pin_project! {
- pub(crate) struct Server<T, S, B, E>
- where
- S: HttpService<IncomingBody>,
- B: Body,
- {
- exec: E,
- timer: Time,
- service: S,
- state: State<T, B>,
- date_header: bool,
- close_pending: bool
- }
-}
-
-enum State<T, B>
-where
- B: Body,
-{
- Handshaking {
- ping_config: ping::Config,
- hs: Handshake<Compat<T>, SendBuf<B::Data>>,
- },
- Serving(Serving<T, B>),
-}
-
-struct Serving<T, B>
-where
- B: Body,
-{
- ping: Option<(ping::Recorder, ping::Ponger)>,
- conn: Connection<Compat<T>, SendBuf<B::Data>>,
- closing: Option<crate::Error>,
- date_header: bool,
-}
-
-impl<T, S, B, E> Server<T, S, B, E>
-where
- T: Read + Write + Unpin,
- S: HttpService<IncomingBody, ResBody = B>,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- E: Http2ServerConnExec<S::Future, B>,
-{
- pub(crate) fn new(
- io: T,
- service: S,
- config: &Config,
- exec: E,
- timer: Time,
- ) -> Server<T, S, B, E> {
- let mut builder = h2::server::Builder::default();
- builder
- .initial_window_size(config.initial_stream_window_size)
- .initial_connection_window_size(config.initial_conn_window_size)
- .max_frame_size(config.max_frame_size)
- .max_header_list_size(config.max_header_list_size)
- .max_local_error_reset_streams(config.max_local_error_reset_streams)
- .max_send_buffer_size(config.max_send_buffer_size);
- if let Some(max) = config.max_concurrent_streams {
- builder.max_concurrent_streams(max);
- }
- if let Some(max) = config.max_pending_accept_reset_streams {
- builder.max_pending_accept_reset_streams(max);
- }
- if config.enable_connect_protocol {
- builder.enable_connect_protocol();
- }
- let handshake = builder.handshake(Compat::new(io));
-
- let bdp = if config.adaptive_window {
- Some(config.initial_stream_window_size)
- } else {
- None
- };
-
- let ping_config = ping::Config {
- bdp_initial_window: bdp,
- keep_alive_interval: config.keep_alive_interval,
- keep_alive_timeout: config.keep_alive_timeout,
- // If keep-alive is enabled for servers, always enabled while
- // idle, so it can more aggressively close dead connections.
- keep_alive_while_idle: true,
- };
-
- Server {
- exec,
- timer,
- state: State::Handshaking {
- ping_config,
- hs: handshake,
- },
- service,
- date_header: config.date_header,
- close_pending: false,
- }
- }
-
- pub(crate) fn graceful_shutdown(&mut self) {
- trace!("graceful_shutdown");
- match self.state {
- State::Handshaking { .. } => {
- self.close_pending = true;
- }
- State::Serving(ref mut srv) => {
- if srv.closing.is_none() {
- srv.conn.graceful_shutdown();
- }
- }
- }
- }
-}
-
-impl<T, S, B, E> Future for Server<T, S, B, E>
-where
- T: Read + Write + Unpin,
- S: HttpService<IncomingBody, ResBody = B>,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- B: Body + 'static,
- E: Http2ServerConnExec<S::Future, B>,
-{
- type Output = crate::Result<Dispatched>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let me = &mut *self;
- loop {
- let next = match me.state {
- State::Handshaking {
- ref mut hs,
- ref ping_config,
- } => {
- let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
- let ping = if ping_config.is_enabled() {
- let pp = conn.ping_pong().expect("conn.ping_pong");
- Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
- } else {
- None
- };
- State::Serving(Serving {
- ping,
- conn,
- closing: None,
- date_header: me.date_header,
- })
- }
- State::Serving(ref mut srv) => {
- // graceful_shutdown was called before handshaking finished,
- if me.close_pending && srv.closing.is_none() {
- srv.conn.graceful_shutdown();
- }
- ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
- return Poll::Ready(Ok(Dispatched::Shutdown));
- }
- };
- me.state = next;
- }
- }
-}
-
-impl<T, B> Serving<T, B>
-where
- T: Read + Write + Unpin,
- B: Body + 'static,
-{
- fn poll_server<S, E>(
- &mut self,
- cx: &mut Context<'_>,
- service: &mut S,
- exec: &mut E,
- ) -> Poll<crate::Result<()>>
- where
- S: HttpService<IncomingBody, ResBody = B>,
- S::Error: Into<Box<dyn StdError + Send + Sync>>,
- E: Http2ServerConnExec<S::Future, B>,
- {
- if self.closing.is_none() {
- loop {
- self.poll_ping(cx);
-
- match ready!(self.conn.poll_accept(cx)) {
- Some(Ok((req, mut respond))) => {
- trace!("incoming request");
- let content_length = headers::content_length_parse_all(req.headers());
- let ping = self
- .ping
- .as_ref()
- .map(|ping| ping.0.clone())
- .unwrap_or_else(ping::disabled);
-
- // Record the headers received
- ping.record_non_data();
-
- let is_connect = req.method() == Method::CONNECT;
- let (mut parts, stream) = req.into_parts();
- let (mut req, connect_parts) = if !is_connect {
- (
- Request::from_parts(
- parts,
- IncomingBody::h2(stream, content_length.into(), ping),
- ),
- None,
- )
- } else {
- if content_length.map_or(false, |len| len != 0) {
- warn!("h2 connect request with non-zero body not supported");
- respond.send_reset(h2::Reason::INTERNAL_ERROR);
- return Poll::Ready(Ok(()));
- }
- let (pending, upgrade) = crate::upgrade::pending();
- debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
- parts.extensions.insert(upgrade);
- (
- Request::from_parts(parts, IncomingBody::empty()),
- Some(ConnectParts {
- pending,
- ping,
- recv_stream: stream,
- }),
- )
- };
-
- if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
- req.extensions_mut().insert(Protocol::from_inner(protocol));
- }
-
- let fut = H2Stream::new(
- service.call(req),
- connect_parts,
- respond,
- self.date_header,
- );
-
- exec.execute_h2stream(fut);
- }
- Some(Err(e)) => {
- return Poll::Ready(Err(crate::Error::new_h2(e)));
- }
- None => {
- // no more incoming streams...
- if let Some((ref ping, _)) = self.ping {
- ping.ensure_not_timed_out()?;
- }
-
- trace!("incoming connection complete");
- return Poll::Ready(Ok(()));
- }
- }
- }
- }
-
- debug_assert!(
- self.closing.is_some(),
- "poll_server broke loop without closing"
- );
-
- ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
-
- Poll::Ready(Err(self.closing.take().expect("polled after error")))
- }
-
- fn poll_ping(&mut self, cx: &mut Context<'_>) {
- if let Some((_, ref mut estimator)) = self.ping {
- match estimator.poll(cx) {
- Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
- self.conn.set_target_window_size(wnd);
- let _ = self.conn.set_initial_window_size(wnd);
- }
- Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
- debug!("keep-alive timed out, closing connection");
- self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
- }
- Poll::Pending => {}
- }
- }
- }
-}
-
-pin_project! {
- #[allow(missing_debug_implementations)]
- pub struct H2Stream<F, B>
- where
- B: Body,
- {
- reply: SendResponse<SendBuf<B::Data>>,
- #[pin]
- state: H2StreamState<F, B>,
- date_header: bool,
- }
-}
-
-pin_project! {
- #[project = H2StreamStateProj]
- enum H2StreamState<F, B>
- where
- B: Body,
- {
- Service {
- #[pin]
- fut: F,
- connect_parts: Option<ConnectParts>,
- },
- Body {
- #[pin]
- pipe: PipeToSendStream<B>,
- },
- }
-}
-
-struct ConnectParts {
- pending: Pending,
- ping: Recorder,
- recv_stream: RecvStream,
-}
-
-impl<F, B> H2Stream<F, B>
-where
- B: Body,
-{
- fn new(
- fut: F,
- connect_parts: Option<ConnectParts>,
- respond: SendResponse<SendBuf<B::Data>>,
- date_header: bool,
- ) -> H2Stream<F, B> {
- H2Stream {
- reply: respond,
- state: H2StreamState::Service { fut, connect_parts },
- date_header,
- }
- }
-}
-
-macro_rules! reply {
- ($me:expr, $res:expr, $eos:expr) => {{
- match $me.reply.send_response($res, $eos) {
- Ok(tx) => tx,
- Err(e) => {
- debug!("send response error: {}", e);
- $me.reply.send_reset(Reason::INTERNAL_ERROR);
- return Poll::Ready(Err(crate::Error::new_h2(e)));
- }
- }
- }};
-}
-
-impl<F, B, E> H2Stream<F, B>
-where
- F: Future<Output = Result<Response<B>, E>>,
- B: Body,
- B::Data: 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- E: Into<Box<dyn StdError + Send + Sync>>,
-{
- fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
- let mut me = self.project();
- loop {
- let next = match me.state.as_mut().project() {
- H2StreamStateProj::Service {
- fut: h,
- connect_parts,
- } => {
- let res = match h.poll(cx) {
- Poll::Ready(Ok(r)) => r,
- Poll::Pending => {
- // Response is not yet ready, so we want to check if the client has sent a
- // RST_STREAM frame which would cancel the current request.
- if let Poll::Ready(reason) =
- me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
- {
- debug!("stream received RST_STREAM: {:?}", reason);
- return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
- }
- return Poll::Pending;
- }
- Poll::Ready(Err(e)) => {
- let err = crate::Error::new_user_service(e);
- warn!("http2 service errored: {}", err);
- me.reply.send_reset(err.h2_reason());
- return Poll::Ready(Err(err));
- }
- };
-
- let (head, body) = res.into_parts();
- let mut res = ::http::Response::from_parts(head, ());
- super::strip_connection_headers(res.headers_mut(), false);
-
- // set Date header if it isn't already set if instructed
- if *me.date_header {
- res.headers_mut()
- .entry(::http::header::DATE)
- .or_insert_with(date::update_and_header_value);
- }
-
- if let Some(connect_parts) = connect_parts.take() {
- if res.status().is_success() {
- if headers::content_length_parse_all(res.headers())
- .map_or(false, |len| len != 0)
- {
- warn!("h2 successful response to CONNECT request with body not supported");
- me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
- return Poll::Ready(Err(crate::Error::new_user_header()));
- }
- if res
- .headers_mut()
- .remove(::http::header::CONTENT_LENGTH)
- .is_some()
- {
- warn!("successful response to CONNECT request disallows content-length header");
- }
- let send_stream = reply!(me, res, false);
- connect_parts.pending.fulfill(Upgraded::new(
- H2Upgraded {
- ping: connect_parts.ping,
- recv_stream: connect_parts.recv_stream,
- send_stream: unsafe { UpgradedSendStream::new(send_stream) },
- buf: Bytes::new(),
- },
- Bytes::new(),
- ));
- return Poll::Ready(Ok(()));
- }
- }
-
- if !body.is_end_stream() {
- // automatically set Content-Length from body...
- if let Some(len) = body.size_hint().exact() {
- headers::set_content_length_if_missing(res.headers_mut(), len);
- }
-
- let body_tx = reply!(me, res, false);
- H2StreamState::Body {
- pipe: PipeToSendStream::new(body, body_tx),
- }
- } else {
- reply!(me, res, true);
- return Poll::Ready(Ok(()));
- }
- }
- H2StreamStateProj::Body { pipe } => {
- return pipe.poll(cx);
- }
- };
- me.state.set(next);
- }
- }
-}
-
-impl<F, B, E> Future for H2Stream<F, B>
-where
- F: Future<Output = Result<Response<B>, E>>,
- B: Body,
- B::Data: 'static,
- B::Error: Into<Box<dyn StdError + Send + Sync>>,
- E: Into<Box<dyn StdError + Send + Sync>>,
-{
- type Output = ();
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- self.poll2(cx).map(|res| {
- if let Err(_e) = res {
- debug!("stream error: {}", _e);
- }
- })
- }
-}