diff options
Diffstat (limited to 'vendor/hyper/src/proto/h2')
| -rw-r--r-- | vendor/hyper/src/proto/h2/client.rs | 749 | ||||
| -rw-r--r-- | vendor/hyper/src/proto/h2/mod.rs | 446 | ||||
| -rw-r--r-- | vendor/hyper/src/proto/h2/ping.rs | 509 | ||||
| -rw-r--r-- | vendor/hyper/src/proto/h2/server.rs | 545 |
4 files changed, 2249 insertions, 0 deletions
diff --git a/vendor/hyper/src/proto/h2/client.rs b/vendor/hyper/src/proto/h2/client.rs new file mode 100644 index 00000000..5e9641e4 --- /dev/null +++ b/vendor/hyper/src/proto/h2/client.rs @@ -0,0 +1,749 @@ +use std::{ + convert::Infallible, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use crate::rt::{Read, Write}; +use bytes::Bytes; +use futures_channel::mpsc::{Receiver, Sender}; +use futures_channel::{mpsc, oneshot}; +use futures_util::future::{Either, FusedFuture, FutureExt as _}; +use futures_util::ready; +use futures_util::stream::{StreamExt as _, StreamFuture}; +use h2::client::{Builder, Connection, SendRequest}; +use h2::SendStream; +use http::{Method, StatusCode}; +use pin_project_lite::pin_project; + +use super::ping::{Ponger, Recorder}; +use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; +use crate::body::{Body, Incoming as IncomingBody}; +use crate::client::dispatch::{Callback, SendWhen, TrySendError}; +use crate::common::io::Compat; +use crate::common::time::Time; +use crate::ext::Protocol; +use crate::headers; +use crate::proto::h2::UpgradedSendStream; +use crate::proto::Dispatched; +use crate::rt::bounds::Http2ClientConnExec; +use crate::upgrade::Upgraded; +use crate::{Request, Response}; +use h2::client::ResponseFuture; + +type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>; + +///// An mpsc channel is used to help notify the `Connection` task when *all* +///// other handles to it have been dropped, so that it can shutdown. +type ConnDropRef = mpsc::Sender<Infallible>; + +///// A oneshot channel watches the `Connection` task, and when it completes, +///// the "dispatch" task will be notified and can shutdown sooner. +type ConnEof = oneshot::Receiver<Infallible>; + +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb +const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb +const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb +const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb + +// The maximum number of concurrent streams that the client is allowed to open +// before it receives the initial SETTINGS frame from the server. +// This default value is derived from what the HTTP/2 spec recommends as the +// minimum value that endpoints advertise to their peers. It means that using +// this value will minimize the chance of the failure where the local endpoint +// attempts to open too many streams and gets rejected by the remote peer with +// the `REFUSED_STREAM` error. +const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100; + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) initial_max_send_streams: usize, + pub(crate) max_frame_size: Option<u32>, + pub(crate) max_header_list_size: u32, + pub(crate) keep_alive_interval: Option<Duration>, + pub(crate) keep_alive_timeout: Duration, + pub(crate) keep_alive_while_idle: bool, + pub(crate) max_concurrent_reset_streams: Option<usize>, + pub(crate) max_send_buffer_size: usize, + pub(crate) max_pending_accept_reset_streams: Option<usize>, + pub(crate) header_table_size: Option<u32>, + pub(crate) max_concurrent_streams: Option<u32>, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS, + max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE), + max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE, + keep_alive_interval: None, + keep_alive_timeout: Duration::from_secs(20), + keep_alive_while_idle: false, + max_concurrent_reset_streams: None, + max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, + max_pending_accept_reset_streams: None, + header_table_size: None, + max_concurrent_streams: None, + } + } +} + +fn new_builder(config: &Config) -> Builder { + let mut builder = Builder::default(); + builder + .initial_max_send_streams(config.initial_max_send_streams) + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .max_header_list_size(config.max_header_list_size) + .max_send_buffer_size(config.max_send_buffer_size) + .enable_push(false); + if let Some(max) = config.max_frame_size { + builder.max_frame_size(max); + } + if let Some(max) = config.max_concurrent_reset_streams { + builder.max_concurrent_reset_streams(max); + } + if let Some(max) = config.max_pending_accept_reset_streams { + builder.max_pending_accept_reset_streams(max); + } + if let Some(size) = config.header_table_size { + builder.header_table_size(size); + } + if let Some(max) = config.max_concurrent_streams { + builder.max_concurrent_streams(max); + } + builder +} + +fn new_ping_config(config: &Config) -> ping::Config { + ping::Config { + bdp_initial_window: if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }, + keep_alive_interval: config.keep_alive_interval, + keep_alive_timeout: config.keep_alive_timeout, + keep_alive_while_idle: config.keep_alive_while_idle, + } +} + +pub(crate) async fn handshake<T, B, E>( + io: T, + req_rx: ClientRx<B>, + config: &Config, + mut exec: E, + timer: Time, +) -> crate::Result<ClientTask<B, E, T>> +where + T: Read + Write + Unpin, + B: Body + 'static, + B::Data: Send + 'static, + E: Http2ClientConnExec<B, T> + Unpin, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, +{ + let (h2_tx, mut conn) = new_builder(config) + .handshake::<_, SendBuf<B::Data>>(Compat::new(io)) + .await + .map_err(crate::Error::new_h2)?; + + // An mpsc channel is used entirely to detect when the + // 'Client' has been dropped. This is to get around a bug + // in h2 where dropping all SendRequests won't notify a + // parked Connection. + let (conn_drop_ref, rx) = mpsc::channel(1); + let (cancel_tx, conn_eof) = oneshot::channel(); + + let conn_drop_rx = rx.into_future(); + + let ping_config = new_ping_config(config); + + let (conn, ping) = if ping_config.is_enabled() { + let pp = conn.ping_pong().expect("conn.ping_pong"); + let (recorder, ponger) = ping::channel(pp, ping_config, timer); + + let conn: Conn<_, B> = Conn::new(ponger, conn); + (Either::Left(conn), recorder) + } else { + (Either::Right(conn), ping::disabled()) + }; + let conn: ConnMapErr<T, B> = ConnMapErr { + conn, + is_terminated: false, + }; + + exec.execute_h2_future(H2ClientFuture::Task { + task: ConnTask::new(conn, conn_drop_rx, cancel_tx), + }); + + Ok(ClientTask { + ping, + conn_drop_ref, + conn_eof, + executor: exec, + h2_tx, + req_rx, + fut_ctx: None, + marker: PhantomData, + }) +} + +pin_project! { + struct Conn<T, B> + where + B: Body, + { + #[pin] + ponger: Ponger, + #[pin] + conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>, + } +} + +impl<T, B> Conn<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self { + Conn { ponger, conn } + } +} + +impl<T, B> Future for Conn<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + type Output = Result<(), h2::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + match this.ponger.poll(cx) { + Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { + this.conn.set_target_window_size(wnd); + this.conn.set_initial_window_size(wnd)?; + } + Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { + debug!("connection keep-alive timed out"); + return Poll::Ready(Ok(())); + } + Poll::Pending => {} + } + + Pin::new(&mut this.conn).poll(cx) + } +} + +pin_project! { + struct ConnMapErr<T, B> + where + B: Body, + T: Read, + T: Write, + T: Unpin, + { + #[pin] + conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>, + #[pin] + is_terminated: bool, + } +} + +impl<T, B> Future for ConnMapErr<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + type Output = Result<(), ()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + if *this.is_terminated { + return Poll::Pending; + } + let polled = this.conn.poll(cx); + if polled.is_ready() { + *this.is_terminated = true; + } + polled.map_err(|_e| { + debug!(error = %_e, "connection error"); + }) + } +} + +impl<T, B> FusedFuture for ConnMapErr<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + fn is_terminated(&self) -> bool { + self.is_terminated + } +} + +pin_project! { + pub struct ConnTask<T, B> + where + B: Body, + T: Read, + T: Write, + T: Unpin, + { + #[pin] + drop_rx: StreamFuture<Receiver<Infallible>>, + #[pin] + cancel_tx: Option<oneshot::Sender<Infallible>>, + #[pin] + conn: ConnMapErr<T, B>, + } +} + +impl<T, B> ConnTask<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + fn new( + conn: ConnMapErr<T, B>, + drop_rx: StreamFuture<Receiver<Infallible>>, + cancel_tx: oneshot::Sender<Infallible>, + ) -> Self { + Self { + drop_rx, + cancel_tx: Some(cancel_tx), + conn, + } + } +} + +impl<T, B> Future for ConnTask<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() { + // ok or err, the `conn` has finished. + return Poll::Ready(()); + } + + if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close. + trace!("send_request dropped, starting conn shutdown"); + drop(this.cancel_tx.take().expect("ConnTask Future polled twice")); + } + + Poll::Pending + } +} + +pin_project! { + #[project = H2ClientFutureProject] + pub enum H2ClientFuture<B, T> + where + B: http_body::Body, + B: 'static, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read, + T: Write, + T: Unpin, + { + Pipe { + #[pin] + pipe: PipeMap<B>, + }, + Send { + #[pin] + send_when: SendWhen<B>, + }, + Task { + #[pin] + task: ConnTask<T, B>, + }, + } +} + +impl<B, T> Future for H2ClientFuture<B, T> +where + B: http_body::Body + 'static, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read + Write + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { + let this = self.project(); + + match this { + H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx), + H2ClientFutureProject::Send { send_when } => send_when.poll(cx), + H2ClientFutureProject::Task { task } => task.poll(cx), + } + } +} + +struct FutCtx<B> +where + B: Body, +{ + is_connect: bool, + eos: bool, + fut: ResponseFuture, + body_tx: SendStream<SendBuf<B::Data>>, + body: B, + cb: Callback<Request<B>, Response<IncomingBody>>, +} + +impl<B: Body> Unpin for FutCtx<B> {} + +pub(crate) struct ClientTask<B, E, T> +where + B: Body, + E: Unpin, +{ + ping: ping::Recorder, + conn_drop_ref: ConnDropRef, + conn_eof: ConnEof, + executor: E, + h2_tx: SendRequest<SendBuf<B::Data>>, + req_rx: ClientRx<B>, + fut_ctx: Option<FutCtx<B>>, + marker: PhantomData<T>, +} + +impl<B, E, T> ClientTask<B, E, T> +where + B: Body + 'static, + E: Http2ClientConnExec<B, T> + Unpin, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read + Write + Unpin, +{ + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.h2_tx.is_extended_connect_protocol_enabled() + } +} + +pin_project! { + pub struct PipeMap<S> + where + S: Body, + { + #[pin] + pipe: PipeToSendStream<S>, + #[pin] + conn_drop_ref: Option<Sender<Infallible>>, + #[pin] + ping: Option<Recorder>, + } +} + +impl<B> Future for PipeMap<B> +where + B: http_body::Body, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { + let mut this = self.project(); + + match this.pipe.poll_unpin(cx) { + Poll::Ready(result) => { + if let Err(_e) = result { + debug!("client request body error: {}", _e); + } + drop(this.conn_drop_ref.take().expect("Future polled twice")); + drop(this.ping.take().expect("Future polled twice")); + return Poll::Ready(()); + } + Poll::Pending => (), + }; + Poll::Pending + } +} + +impl<B, E, T> ClientTask<B, E, T> +where + B: Body + 'static + Unpin, + B::Data: Send, + E: Http2ClientConnExec<B, T> + Unpin, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read + Write + Unpin, +{ + fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) { + let ping = self.ping.clone(); + + let send_stream = if !f.is_connect { + if !f.eos { + let mut pipe = PipeToSendStream::new(f.body, f.body_tx); + + // eagerly see if the body pipe is ready and + // can thus skip allocating in the executor + match Pin::new(&mut pipe).poll(cx) { + Poll::Ready(_) => (), + Poll::Pending => { + let conn_drop_ref = self.conn_drop_ref.clone(); + // keep the ping recorder's knowledge of an + // "open stream" alive while this body is + // still sending... + let ping = ping.clone(); + + let pipe = PipeMap { + pipe, + conn_drop_ref: Some(conn_drop_ref), + ping: Some(ping), + }; + // Clear send task + self.executor + .execute_h2_future(H2ClientFuture::Pipe { pipe }); + } + } + } + + None + } else { + Some(f.body_tx) + }; + + self.executor.execute_h2_future(H2ClientFuture::Send { + send_when: SendWhen { + when: ResponseFutMap { + fut: f.fut, + ping: Some(ping), + send_stream: Some(send_stream), + }, + call_back: Some(f.cb), + }, + }); + } +} + +pin_project! { + pub(crate) struct ResponseFutMap<B> + where + B: Body, + B: 'static, + { + #[pin] + fut: ResponseFuture, + #[pin] + ping: Option<Recorder>, + #[pin] + send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>, + } +} + +impl<B> Future for ResponseFutMap<B> +where + B: Body + 'static, +{ + type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + let result = ready!(this.fut.poll(cx)); + + let ping = this.ping.take().expect("Future polled twice"); + let send_stream = this.send_stream.take().expect("Future polled twice"); + + match result { + Ok(res) => { + // record that we got the response headers + ping.record_non_data(); + + let content_length = headers::content_length_parse_all(res.headers()); + if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { + if content_length.map_or(false, |len| len != 0) { + warn!("h2 connect response with non-zero body not supported"); + + send_stream.send_reset(h2::Reason::INTERNAL_ERROR); + return Poll::Ready(Err(( + crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + None::<Request<B>>, + ))); + } + let (parts, recv_stream) = res.into_parts(); + let mut res = Response::from_parts(parts, IncomingBody::empty()); + + let (pending, on_upgrade) = crate::upgrade::pending(); + let io = H2Upgraded { + ping, + send_stream: unsafe { UpgradedSendStream::new(send_stream) }, + recv_stream, + buf: Bytes::new(), + }; + let upgraded = Upgraded::new(io, Bytes::new()); + + pending.fulfill(upgraded); + res.extensions_mut().insert(on_upgrade); + + Poll::Ready(Ok(res)) + } else { + let res = res.map(|stream| { + let ping = ping.for_stream(&stream); + IncomingBody::h2(stream, content_length.into(), ping) + }); + Poll::Ready(Ok(res)) + } + } + Err(err) => { + ping.ensure_not_timed_out().map_err(|e| (e, None))?; + + debug!("client response error: {}", err); + Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>))) + } + } + } +} + +impl<B, E, T> Future for ClientTask<B, E, T> +where + B: Body + 'static + Unpin, + B::Data: Send, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + E: Http2ClientConnExec<B, T> + Unpin, + T: Read + Write + Unpin, +{ + type Output = crate::Result<Dispatched>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + match ready!(self.h2_tx.poll_ready(cx)) { + Ok(()) => (), + Err(err) => { + self.ping.ensure_not_timed_out()?; + return if err.reason() == Some(::h2::Reason::NO_ERROR) { + trace!("connection gracefully shutdown"); + Poll::Ready(Ok(Dispatched::Shutdown)) + } else { + Poll::Ready(Err(crate::Error::new_h2(err))) + }; + } + }; + + // If we were waiting on pending open + // continue where we left off. + if let Some(f) = self.fut_ctx.take() { + self.poll_pipe(f, cx); + continue; + } + + match self.req_rx.poll_recv(cx) { + Poll::Ready(Some((req, cb))) => { + // check that future hasn't been canceled already + if cb.is_canceled() { + trace!("request callback is canceled"); + continue; + } + let (head, body) = req.into_parts(); + let mut req = ::http::Request::from_parts(head, ()); + super::strip_connection_headers(req.headers_mut(), true); + if let Some(len) = body.size_hint().exact() { + if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { + headers::set_content_length_if_missing(req.headers_mut(), len); + } + } + + let is_connect = req.method() == Method::CONNECT; + let eos = body.is_end_stream(); + + if is_connect + && headers::content_length_parse_all(req.headers()) + .map_or(false, |len| len != 0) + { + warn!("h2 connect request with non-zero body not supported"); + cb.send(Err(TrySendError { + error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + message: None, + })); + continue; + } + + if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { + req.extensions_mut().insert(protocol.into_inner()); + } + + let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { + Ok(ok) => ok, + Err(err) => { + debug!("client send request error: {}", err); + cb.send(Err(TrySendError { + error: crate::Error::new_h2(err), + message: None, + })); + continue; + } + }; + + let f = FutCtx { + is_connect, + eos, + fut, + body_tx, + body, + cb, + }; + + // Check poll_ready() again. + // If the call to send_request() resulted in the new stream being pending open + // we have to wait for the open to complete before accepting new requests. + match self.h2_tx.poll_ready(cx) { + Poll::Pending => { + // Save Context + self.fut_ctx = Some(f); + return Poll::Pending; + } + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(err)) => { + f.cb.send(Err(TrySendError { + error: crate::Error::new_h2(err), + message: None, + })); + continue; + } + } + self.poll_pipe(f, cx); + continue; + } + + Poll::Ready(None) => { + trace!("client::dispatch::Sender dropped"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + + Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { + // As of Rust 1.82, this pattern is no longer needed, and emits a warning. + // But we cannot remove it as long as MSRV is less than that. + #[allow(unused)] + Ok(never) => match never {}, + Err(_conn_is_eof) => { + trace!("connection task is closed, closing dispatch task"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + }, + } + } + } +} diff --git a/vendor/hyper/src/proto/h2/mod.rs b/vendor/hyper/src/proto/h2/mod.rs new file mode 100644 index 00000000..adb6de87 --- /dev/null +++ b/vendor/hyper/src/proto/h2/mod.rs @@ -0,0 +1,446 @@ +use std::error::Error as StdError; +use std::future::Future; +use std::io::{Cursor, IoSlice}; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::{Buf, Bytes}; +use futures_util::ready; +use h2::{Reason, RecvStream, SendStream}; +use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE}; +use http::HeaderMap; +use pin_project_lite::pin_project; + +use crate::body::Body; +use crate::proto::h2::ping::Recorder; +use crate::rt::{Read, ReadBufCursor, Write}; + +pub(crate) mod ping; + +cfg_client! { + pub(crate) mod client; + pub(crate) use self::client::ClientTask; +} + +cfg_server! { + pub(crate) mod server; + pub(crate) use self::server::Server; +} + +/// Default initial stream window size defined in HTTP2 spec. +pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; + +// List of connection headers from RFC 9110 Section 7.6.1 +// +// TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're +// tested separately. +static CONNECTION_HEADERS: [HeaderName; 4] = [ + HeaderName::from_static("keep-alive"), + HeaderName::from_static("proxy-connection"), + TRANSFER_ENCODING, + UPGRADE, +]; + +fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { + for header in &CONNECTION_HEADERS { + if headers.remove(header).is_some() { + warn!("Connection header illegal in HTTP/2: {}", header.as_str()); + } + } + + if is_request { + if headers + .get(TE) + .map_or(false, |te_header| te_header != "trailers") + { + warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests"); + headers.remove(TE); + } + } else if headers.remove(TE).is_some() { + warn!("TE headers illegal in HTTP/2 responses"); + } + + if let Some(header) = headers.remove(CONNECTION) { + warn!( + "Connection header illegal in HTTP/2: {}", + CONNECTION.as_str() + ); + let header_contents = header.to_str().unwrap(); + + // A `Connection` header may have a comma-separated list of names of other headers that + // are meant for only this specific connection. + // + // Iterate these names and remove them as headers. Connection-specific headers are + // forbidden in HTTP2, as that information has been moved into frame types of the h2 + // protocol. + for name in header_contents.split(',') { + let name = name.trim(); + headers.remove(name); + } + } +} + +// body adapters used by both Client and Server + +pin_project! { + pub(crate) struct PipeToSendStream<S> + where + S: Body, + { + body_tx: SendStream<SendBuf<S::Data>>, + data_done: bool, + #[pin] + stream: S, + } +} + +impl<S> PipeToSendStream<S> +where + S: Body, +{ + fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> { + PipeToSendStream { + body_tx: tx, + data_done: false, + stream, + } + } +} + +impl<S> Future for PipeToSendStream<S> +where + S: Body, + S::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = crate::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + loop { + // we don't have the next chunk of data yet, so just reserve 1 byte to make + // sure there's some capacity available. h2 will handle the capacity management + // for the actual body chunk. + me.body_tx.reserve_capacity(1); + + if me.body_tx.capacity() == 0 { + loop { + match ready!(me.body_tx.poll_capacity(cx)) { + Some(Ok(0)) => {} + Some(Ok(_)) => break, + Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))), + None => { + // None means the stream is no longer in a + // streaming state, we either finished it + // somehow, or the remote reset us. + return Poll::Ready(Err(crate::Error::new_body_write( + "send stream capacity unexpectedly closed", + ))); + } + } + } + } else if let Poll::Ready(reason) = me + .body_tx + .poll_reset(cx) + .map_err(crate::Error::new_body_write)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); + } + + match ready!(me.stream.as_mut().poll_frame(cx)) { + Some(Ok(frame)) => { + if frame.is_data() { + let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); + let is_eos = me.stream.is_end_stream(); + trace!( + "send body chunk: {} bytes, eos={}", + chunk.remaining(), + is_eos, + ); + + let buf = SendBuf::Buf(chunk); + me.body_tx + .send_data(buf, is_eos) + .map_err(crate::Error::new_body_write)?; + + if is_eos { + return Poll::Ready(Ok(())); + } + } else if frame.is_trailers() { + // no more DATA, so give any capacity back + me.body_tx.reserve_capacity(0); + me.body_tx + .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!())) + .map_err(crate::Error::new_body_write)?; + return Poll::Ready(Ok(())); + } else { + trace!("discarding unknown frame"); + // loop again + } + } + Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))), + None => { + // no more frames means we're done here + // but at this point, we haven't sent an EOS DATA, or + // any trailers, so send an empty EOS DATA. + return Poll::Ready(me.body_tx.send_eos_frame()); + } + } + } + } +} + +trait SendStreamExt { + fn on_user_err<E>(&mut self, err: E) -> crate::Error + where + E: Into<Box<dyn std::error::Error + Send + Sync>>; + fn send_eos_frame(&mut self) -> crate::Result<()>; +} + +impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> { + fn on_user_err<E>(&mut self, err: E) -> crate::Error + where + E: Into<Box<dyn std::error::Error + Send + Sync>>, + { + let err = crate::Error::new_user_body(err); + debug!("send body user stream error: {}", err); + self.send_reset(err.h2_reason()); + err + } + + fn send_eos_frame(&mut self) -> crate::Result<()> { + trace!("send body eos"); + self.send_data(SendBuf::None, true) + .map_err(crate::Error::new_body_write) + } +} + +#[repr(usize)] +enum SendBuf<B> { + Buf(B), + Cursor(Cursor<Box<[u8]>>), + None, +} + +impl<B: Buf> Buf for SendBuf<B> { + #[inline] + fn remaining(&self) -> usize { + match *self { + Self::Buf(ref b) => b.remaining(), + Self::Cursor(ref c) => Buf::remaining(c), + Self::None => 0, + } + } + + #[inline] + fn chunk(&self) -> &[u8] { + match *self { + Self::Buf(ref b) => b.chunk(), + Self::Cursor(ref c) => c.chunk(), + Self::None => &[], + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + match *self { + Self::Buf(ref mut b) => b.advance(cnt), + Self::Cursor(ref mut c) => c.advance(cnt), + Self::None => {} + } + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize { + match *self { + Self::Buf(ref b) => b.chunks_vectored(dst), + Self::Cursor(ref c) => c.chunks_vectored(dst), + Self::None => 0, + } + } +} + +struct H2Upgraded<B> +where + B: Buf, +{ + ping: Recorder, + send_stream: UpgradedSendStream<B>, + recv_stream: RecvStream, + buf: Bytes, +} + +impl<B> Read for H2Upgraded<B> +where + B: Buf, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut read_buf: ReadBufCursor<'_>, + ) -> Poll<Result<(), std::io::Error>> { + if self.buf.is_empty() { + self.buf = loop { + match ready!(self.recv_stream.poll_data(cx)) { + None => return Poll::Ready(Ok(())), + Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => { + continue + } + Some(Ok(buf)) => { + self.ping.record_data(buf.len()); + break buf; + } + Some(Err(e)) => { + return Poll::Ready(match e.reason() { + Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()), + Some(Reason::STREAM_CLOSED) => { + Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + } + _ => Err(h2_to_io_error(e)), + }) + } + } + }; + } + let cnt = std::cmp::min(self.buf.len(), read_buf.remaining()); + read_buf.put_slice(&self.buf[..cnt]); + self.buf.advance(cnt); + let _ = self.recv_stream.flow_control().release_capacity(cnt); + Poll::Ready(Ok(())) + } +} + +impl<B> Write for H2Upgraded<B> +where + B: Buf, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + self.send_stream.reserve_capacity(buf.len()); + + // We ignore all errors returned by `poll_capacity` and `write`, as we + // will get the correct from `poll_reset` anyway. + let cnt = match ready!(self.send_stream.poll_capacity(cx)) { + None => Some(0), + Some(Ok(cnt)) => self + .send_stream + .write(&buf[..cnt], false) + .ok() + .map(|()| cnt), + Some(Err(_)) => None, + }; + + if let Some(cnt) = cnt { + return Poll::Ready(Ok(cnt)); + } + + Poll::Ready(Err(h2_to_io_error( + match ready!(self.send_stream.poll_reset(cx)) { + Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { + return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) + } + Ok(reason) => reason.into(), + Err(e) => e, + }, + ))) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + if self.send_stream.write(&[], true).is_ok() { + return Poll::Ready(Ok(())); + } + + Poll::Ready(Err(h2_to_io_error( + match ready!(self.send_stream.poll_reset(cx)) { + Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())), + Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => { + return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())) + } + Ok(reason) => reason.into(), + Err(e) => e, + }, + ))) + } +} + +fn h2_to_io_error(e: h2::Error) -> std::io::Error { + if e.is_io() { + e.into_io().unwrap() + } else { + std::io::Error::new(std::io::ErrorKind::Other, e) + } +} + +struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>); + +impl<B> UpgradedSendStream<B> +where + B: Buf, +{ + unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self { + assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>()); + Self(mem::transmute(inner)) + } + + fn reserve_capacity(&mut self, cnt: usize) { + unsafe { self.as_inner_unchecked().reserve_capacity(cnt) } + } + + fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> { + unsafe { self.as_inner_unchecked().poll_capacity(cx) } + } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> { + unsafe { self.as_inner_unchecked().poll_reset(cx) } + } + + fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), std::io::Error> { + let send_buf = SendBuf::Cursor(Cursor::new(buf.into())); + unsafe { + self.as_inner_unchecked() + .send_data(send_buf, end_of_stream) + .map_err(h2_to_io_error) + } + } + + unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> { + &mut *(&mut self.0 as *mut _ as *mut _) + } +} + +#[repr(transparent)] +struct Neutered<B> { + _inner: B, + impossible: Impossible, +} + +enum Impossible {} + +unsafe impl<B> Send for Neutered<B> {} + +impl<B> Buf for Neutered<B> { + fn remaining(&self) -> usize { + match self.impossible {} + } + + fn chunk(&self) -> &[u8] { + match self.impossible {} + } + + fn advance(&mut self, _cnt: usize) { + match self.impossible {} + } +} diff --git a/vendor/hyper/src/proto/h2/ping.rs b/vendor/hyper/src/proto/h2/ping.rs new file mode 100644 index 00000000..749cf1b7 --- /dev/null +++ b/vendor/hyper/src/proto/h2/ping.rs @@ -0,0 +1,509 @@ +/// HTTP2 Ping usage +/// +/// hyper uses HTTP2 pings for two purposes: +/// +/// 1. Adaptive flow control using BDP +/// 2. Connection keep-alive +/// +/// Both cases are optional. +/// +/// # BDP Algorithm +/// +/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: +/// 1a. Record current time. +/// 1b. Send a BDP ping. +/// 2. Increment the number of received bytes. +/// 3. When the BDP ping ack is received: +/// 3a. Record duration from sent time. +/// 3b. Merge RTT with a running average. +/// 3c. Calculate bdp as bytes/rtt. +/// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; + +use h2::{Ping, PingPong}; + +use crate::common::time::Time; +use crate::rt::Sleep; + +type WindowSize = u32; + +pub(super) fn disabled() -> Recorder { + Recorder { shared: None } +} + +pub(super) fn channel(ping_pong: PingPong, config: Config, __timer: Time) -> (Recorder, Ponger) { + debug_assert!( + config.is_enabled(), + "ping channel requires bdp or keep-alive config", + ); + + let bdp = config.bdp_initial_window.map(|wnd| Bdp { + bdp: wnd, + max_bandwidth: 0.0, + rtt: 0.0, + ping_delay: Duration::from_millis(100), + stable_count: 0, + }); + + let (bytes, next_bdp_at) = if bdp.is_some() { + (Some(0), Some(Instant::now())) + } else { + (None, None) + }; + + let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive { + interval, + timeout: config.keep_alive_timeout, + while_idle: config.keep_alive_while_idle, + sleep: __timer.sleep(interval), + state: KeepAliveState::Init, + timer: __timer, + }); + + let last_read_at = keep_alive.as_ref().map(|_| Instant::now()); + + let shared = Arc::new(Mutex::new(Shared { + bytes, + last_read_at, + is_keep_alive_timed_out: false, + ping_pong, + ping_sent_at: None, + next_bdp_at, + })); + + ( + Recorder { + shared: Some(shared.clone()), + }, + Ponger { + bdp, + keep_alive, + shared, + }, + ) +} + +#[derive(Clone)] +pub(super) struct Config { + pub(super) bdp_initial_window: Option<WindowSize>, + /// If no frames are received in this amount of time, a PING frame is sent. + pub(super) keep_alive_interval: Option<Duration>, + /// After sending a keepalive PING, the connection will be closed if + /// a pong is not received in this amount of time. + pub(super) keep_alive_timeout: Duration, + /// If true, sends pings even when there are no active streams. + pub(super) keep_alive_while_idle: bool, +} + +#[derive(Clone)] +pub(crate) struct Recorder { + shared: Option<Arc<Mutex<Shared>>>, +} + +pub(super) struct Ponger { + bdp: Option<Bdp>, + keep_alive: Option<KeepAlive>, + shared: Arc<Mutex<Shared>>, +} + +struct Shared { + ping_pong: PingPong, + ping_sent_at: Option<Instant>, + + // bdp + /// If `Some`, bdp is enabled, and this tracks how many bytes have been + /// read during the current sample. + bytes: Option<usize>, + /// We delay a variable amount of time between BDP pings. This allows us + /// to send less pings as the bandwidth stabilizes. + next_bdp_at: Option<Instant>, + + // keep-alive + /// If `Some`, keep-alive is enabled, and the Instant is how long ago + /// the connection read the last frame. + last_read_at: Option<Instant>, + + is_keep_alive_timed_out: bool, +} + +struct Bdp { + /// Current BDP in bytes + bdp: u32, + /// Largest bandwidth we've seen so far. + max_bandwidth: f64, + /// Round trip time in seconds + rtt: f64, + /// Delay the next ping by this amount. + /// + /// This will change depending on how stable the current bandwidth is. + ping_delay: Duration, + /// The count of ping round trips where BDP has stayed the same. + stable_count: u32, +} + +struct KeepAlive { + /// If no frames are received in this amount of time, a PING frame is sent. + interval: Duration, + /// After sending a keepalive PING, the connection will be closed if + /// a pong is not received in this amount of time. + timeout: Duration, + /// If true, sends pings even when there are no active streams. + while_idle: bool, + state: KeepAliveState, + sleep: Pin<Box<dyn Sleep>>, + timer: Time, +} + +enum KeepAliveState { + Init, + Scheduled(Instant), + PingSent, +} + +pub(super) enum Ponged { + SizeUpdate(WindowSize), + KeepAliveTimedOut, +} + +#[derive(Debug)] +pub(super) struct KeepAliveTimedOut; + +// ===== impl Config ===== + +impl Config { + pub(super) fn is_enabled(&self) -> bool { + self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some() + } +} + +// ===== impl Recorder ===== + +impl Recorder { + pub(crate) fn record_data(&self, len: usize) { + let shared = if let Some(ref shared) = self.shared { + shared + } else { + return; + }; + + let mut locked = shared.lock().unwrap(); + + locked.update_last_read_at(); + + // are we ready to send another bdp ping? + // if not, we don't need to record bytes either + + if let Some(ref next_bdp_at) = locked.next_bdp_at { + if Instant::now() < *next_bdp_at { + return; + } else { + locked.next_bdp_at = None; + } + } + + if let Some(ref mut bytes) = locked.bytes { + *bytes += len; + } else { + // no need to send bdp ping if bdp is disabled + return; + } + + if !locked.is_ping_sent() { + locked.send_ping(); + } + } + + pub(crate) fn record_non_data(&self) { + let shared = if let Some(ref shared) = self.shared { + shared + } else { + return; + }; + + let mut locked = shared.lock().unwrap(); + + locked.update_last_read_at(); + } + + /// If the incoming stream is already closed, convert self into + /// a disabled reporter. + #[cfg(feature = "client")] + pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self { + if stream.is_end_stream() { + disabled() + } else { + self + } + } + + pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> { + if let Some(ref shared) = self.shared { + let locked = shared.lock().unwrap(); + if locked.is_keep_alive_timed_out { + return Err(KeepAliveTimedOut.crate_error()); + } + } + + // else + Ok(()) + } +} + +// ===== impl Ponger ===== + +impl Ponger { + pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> { + let now = Instant::now(); + let mut locked = self.shared.lock().unwrap(); + let is_idle = self.is_idle(); + + if let Some(ref mut ka) = self.keep_alive { + ka.maybe_schedule(is_idle, &locked); + ka.maybe_ping(cx, is_idle, &mut locked); + } + + if !locked.is_ping_sent() { + // XXX: this doesn't register a waker...? + return Poll::Pending; + } + + match locked.ping_pong.poll_pong(cx) { + Poll::Ready(Ok(_pong)) => { + let start = locked + .ping_sent_at + .expect("pong received implies ping_sent_at"); + locked.ping_sent_at = None; + let rtt = now - start; + trace!("recv pong"); + + if let Some(ref mut ka) = self.keep_alive { + locked.update_last_read_at(); + ka.maybe_schedule(is_idle, &locked); + ka.maybe_ping(cx, is_idle, &mut locked); + } + + if let Some(ref mut bdp) = self.bdp { + let bytes = locked.bytes.expect("bdp enabled implies bytes"); + locked.bytes = Some(0); // reset + trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); + + let update = bdp.calculate(bytes, rtt); + locked.next_bdp_at = Some(now + bdp.ping_delay); + if let Some(update) = update { + return Poll::Ready(Ponged::SizeUpdate(update)); + } + } + } + Poll::Ready(Err(_e)) => { + debug!("pong error: {}", _e); + } + Poll::Pending => { + if let Some(ref mut ka) = self.keep_alive { + if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) { + self.keep_alive = None; + locked.is_keep_alive_timed_out = true; + return Poll::Ready(Ponged::KeepAliveTimedOut); + } + } + } + } + + // XXX: this doesn't register a waker...? + Poll::Pending + } + + fn is_idle(&self) -> bool { + Arc::strong_count(&self.shared) <= 2 + } +} + +// ===== impl Shared ===== + +impl Shared { + fn send_ping(&mut self) { + match self.ping_pong.send_ping(Ping::opaque()) { + Ok(()) => { + self.ping_sent_at = Some(Instant::now()); + trace!("sent ping"); + } + Err(_err) => { + debug!("error sending ping: {}", _err); + } + } + } + + fn is_ping_sent(&self) -> bool { + self.ping_sent_at.is_some() + } + + fn update_last_read_at(&mut self) { + if self.last_read_at.is_some() { + self.last_read_at = Some(Instant::now()); + } + } + + fn last_read_at(&self) -> Instant { + self.last_read_at.expect("keep_alive expects last_read_at") + } +} + +// ===== impl Bdp ===== + +/// Any higher than this likely will be hitting the TCP flow control. +const BDP_LIMIT: usize = 1024 * 1024 * 16; + +impl Bdp { + fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { + // No need to do any math if we're at the limit. + if self.bdp as usize == BDP_LIMIT { + self.stabilize_delay(); + return None; + } + + // average the rtt + let rtt = seconds(rtt); + if self.rtt == 0.0 { + // First sample means rtt is first rtt. + self.rtt = rtt; + } else { + // Weigh this rtt as 1/8 for a moving average. + self.rtt += (rtt - self.rtt) * 0.125; + } + + // calculate the current bandwidth + let bw = (bytes as f64) / (self.rtt * 1.5); + trace!("current bandwidth = {:.1}B/s", bw); + + if bw < self.max_bandwidth { + // not a faster bandwidth, so don't update + self.stabilize_delay(); + return None; + } else { + self.max_bandwidth = bw; + } + + // if the current `bytes` sample is at least 2/3 the previous + // bdp, increase to double the current sample. + if bytes >= self.bdp as usize * 2 / 3 { + self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; + trace!("BDP increased to {}", self.bdp); + + self.stable_count = 0; + self.ping_delay /= 2; + Some(self.bdp) + } else { + self.stabilize_delay(); + None + } + } + + fn stabilize_delay(&mut self) { + if self.ping_delay < Duration::from_secs(10) { + self.stable_count += 1; + + if self.stable_count >= 2 { + self.ping_delay *= 4; + self.stable_count = 0; + } + } + } +} + +fn seconds(dur: Duration) -> f64 { + const NANOS_PER_SEC: f64 = 1_000_000_000.0; + let secs = dur.as_secs() as f64; + secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC +} + +// ===== impl KeepAlive ===== + +impl KeepAlive { + fn maybe_schedule(&mut self, is_idle: bool, shared: &Shared) { + match self.state { + KeepAliveState::Init => { + if !self.while_idle && is_idle { + return; + } + + self.schedule(shared); + } + KeepAliveState::PingSent => { + if shared.is_ping_sent() { + return; + } + self.schedule(shared); + } + KeepAliveState::Scheduled(..) => (), + } + } + + fn schedule(&mut self, shared: &Shared) { + let interval = shared.last_read_at() + self.interval; + self.state = KeepAliveState::Scheduled(interval); + self.timer.reset(&mut self.sleep, interval); + } + + fn maybe_ping(&mut self, cx: &mut task::Context<'_>, is_idle: bool, shared: &mut Shared) { + match self.state { + KeepAliveState::Scheduled(at) => { + if Pin::new(&mut self.sleep).poll(cx).is_pending() { + return; + } + // check if we've received a frame while we were scheduled + if shared.last_read_at() + self.interval > at { + self.state = KeepAliveState::Init; + cx.waker().wake_by_ref(); // schedule us again + return; + } + if !self.while_idle && is_idle { + trace!("keep-alive no need to ping when idle and while_idle=false"); + return; + } + trace!("keep-alive interval ({:?}) reached", self.interval); + shared.send_ping(); + self.state = KeepAliveState::PingSent; + let timeout = Instant::now() + self.timeout; + self.timer.reset(&mut self.sleep, timeout); + } + KeepAliveState::Init | KeepAliveState::PingSent => (), + } + } + + fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> { + match self.state { + KeepAliveState::PingSent => { + if Pin::new(&mut self.sleep).poll(cx).is_pending() { + return Ok(()); + } + trace!("keep-alive timeout ({:?}) reached", self.timeout); + Err(KeepAliveTimedOut) + } + KeepAliveState::Init | KeepAliveState::Scheduled(..) => Ok(()), + } + } +} + +// ===== impl KeepAliveTimedOut ===== + +impl KeepAliveTimedOut { + pub(super) fn crate_error(self) -> crate::Error { + crate::Error::new(crate::error::Kind::Http2).with(self) + } +} + +impl fmt::Display for KeepAliveTimedOut { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("keep-alive timed out") + } +} + +impl std::error::Error for KeepAliveTimedOut { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&crate::error::TimedOut) + } +} diff --git a/vendor/hyper/src/proto/h2/server.rs b/vendor/hyper/src/proto/h2/server.rs new file mode 100644 index 00000000..a8a20dd6 --- /dev/null +++ b/vendor/hyper/src/proto/h2/server.rs @@ -0,0 +1,545 @@ +use std::error::Error as StdError; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use bytes::Bytes; +use futures_util::ready; +use h2::server::{Connection, Handshake, SendResponse}; +use h2::{Reason, RecvStream}; +use http::{Method, Request}; +use pin_project_lite::pin_project; + +use super::{ping, PipeToSendStream, SendBuf}; +use crate::body::{Body, Incoming as IncomingBody}; +use crate::common::date; +use crate::common::io::Compat; +use crate::common::time::Time; +use crate::ext::Protocol; +use crate::headers; +use crate::proto::h2::ping::Recorder; +use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; +use crate::proto::Dispatched; +use crate::rt::bounds::Http2ServerConnExec; +use crate::rt::{Read, Write}; +use crate::service::HttpService; + +use crate::upgrade::{OnUpgrade, Pending, Upgraded}; +use crate::Response; + +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +// +// At the same time, a server more often has multiple clients connected, and +// so is more likely to use more resources than a client would. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb +const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb +const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb +const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb +const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024; + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) max_frame_size: u32, + pub(crate) enable_connect_protocol: bool, + pub(crate) max_concurrent_streams: Option<u32>, + pub(crate) max_pending_accept_reset_streams: Option<usize>, + pub(crate) max_local_error_reset_streams: Option<usize>, + pub(crate) keep_alive_interval: Option<Duration>, + pub(crate) keep_alive_timeout: Duration, + pub(crate) max_send_buffer_size: usize, + pub(crate) max_header_list_size: u32, + pub(crate) date_header: bool, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + max_frame_size: DEFAULT_MAX_FRAME_SIZE, + enable_connect_protocol: false, + max_concurrent_streams: Some(200), + max_pending_accept_reset_streams: None, + max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS), + keep_alive_interval: None, + keep_alive_timeout: Duration::from_secs(20), + max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, + max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, + date_header: true, + } + } +} + +pin_project! { + pub(crate) struct Server<T, S, B, E> + where + S: HttpService<IncomingBody>, + B: Body, + { + exec: E, + timer: Time, + service: S, + state: State<T, B>, + date_header: bool, + close_pending: bool + } +} + +enum State<T, B> +where + B: Body, +{ + Handshaking { + ping_config: ping::Config, + hs: Handshake<Compat<T>, SendBuf<B::Data>>, + }, + Serving(Serving<T, B>), +} + +struct Serving<T, B> +where + B: Body, +{ + ping: Option<(ping::Recorder, ping::Ponger)>, + conn: Connection<Compat<T>, SendBuf<B::Data>>, + closing: Option<crate::Error>, + date_header: bool, +} + +impl<T, S, B, E> Server<T, S, B, E> +where + T: Read + Write + Unpin, + S: HttpService<IncomingBody, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + B: Body + 'static, + E: Http2ServerConnExec<S::Future, B>, +{ + pub(crate) fn new( + io: T, + service: S, + config: &Config, + exec: E, + timer: Time, + ) -> Server<T, S, B, E> { + let mut builder = h2::server::Builder::default(); + builder + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .max_frame_size(config.max_frame_size) + .max_header_list_size(config.max_header_list_size) + .max_local_error_reset_streams(config.max_local_error_reset_streams) + .max_send_buffer_size(config.max_send_buffer_size); + if let Some(max) = config.max_concurrent_streams { + builder.max_concurrent_streams(max); + } + if let Some(max) = config.max_pending_accept_reset_streams { + builder.max_pending_accept_reset_streams(max); + } + if config.enable_connect_protocol { + builder.enable_connect_protocol(); + } + let handshake = builder.handshake(Compat::new(io)); + + let bdp = if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }; + + let ping_config = ping::Config { + bdp_initial_window: bdp, + keep_alive_interval: config.keep_alive_interval, + keep_alive_timeout: config.keep_alive_timeout, + // If keep-alive is enabled for servers, always enabled while + // idle, so it can more aggressively close dead connections. + keep_alive_while_idle: true, + }; + + Server { + exec, + timer, + state: State::Handshaking { + ping_config, + hs: handshake, + }, + service, + date_header: config.date_header, + close_pending: false, + } + } + + pub(crate) fn graceful_shutdown(&mut self) { + trace!("graceful_shutdown"); + match self.state { + State::Handshaking { .. } => { + self.close_pending = true; + } + State::Serving(ref mut srv) => { + if srv.closing.is_none() { + srv.conn.graceful_shutdown(); + } + } + } + } +} + +impl<T, S, B, E> Future for Server<T, S, B, E> +where + T: Read + Write + Unpin, + S: HttpService<IncomingBody, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + B: Body + 'static, + E: Http2ServerConnExec<S::Future, B>, +{ + type Output = crate::Result<Dispatched>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = &mut *self; + loop { + let next = match me.state { + State::Handshaking { + ref mut hs, + ref ping_config, + } => { + let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; + let ping = if ping_config.is_enabled() { + let pp = conn.ping_pong().expect("conn.ping_pong"); + Some(ping::channel(pp, ping_config.clone(), me.timer.clone())) + } else { + None + }; + State::Serving(Serving { + ping, + conn, + closing: None, + date_header: me.date_header, + }) + } + State::Serving(ref mut srv) => { + // graceful_shutdown was called before handshaking finished, + if me.close_pending && srv.closing.is_none() { + srv.conn.graceful_shutdown(); + } + ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + }; + me.state = next; + } + } +} + +impl<T, B> Serving<T, B> +where + T: Read + Write + Unpin, + B: Body + 'static, +{ + fn poll_server<S, E>( + &mut self, + cx: &mut Context<'_>, + service: &mut S, + exec: &mut E, + ) -> Poll<crate::Result<()>> + where + S: HttpService<IncomingBody, ResBody = B>, + S::Error: Into<Box<dyn StdError + Send + Sync>>, + E: Http2ServerConnExec<S::Future, B>, + { + if self.closing.is_none() { + loop { + self.poll_ping(cx); + + match ready!(self.conn.poll_accept(cx)) { + Some(Ok((req, mut respond))) => { + trace!("incoming request"); + let content_length = headers::content_length_parse_all(req.headers()); + let ping = self + .ping + .as_ref() + .map(|ping| ping.0.clone()) + .unwrap_or_else(ping::disabled); + + // Record the headers received + ping.record_non_data(); + + let is_connect = req.method() == Method::CONNECT; + let (mut parts, stream) = req.into_parts(); + let (mut req, connect_parts) = if !is_connect { + ( + Request::from_parts( + parts, + IncomingBody::h2(stream, content_length.into(), ping), + ), + None, + ) + } else { + if content_length.map_or(false, |len| len != 0) { + warn!("h2 connect request with non-zero body not supported"); + respond.send_reset(h2::Reason::INTERNAL_ERROR); + return Poll::Ready(Ok(())); + } + let (pending, upgrade) = crate::upgrade::pending(); + debug_assert!(parts.extensions.get::<OnUpgrade>().is_none()); + parts.extensions.insert(upgrade); + ( + Request::from_parts(parts, IncomingBody::empty()), + Some(ConnectParts { + pending, + ping, + recv_stream: stream, + }), + ) + }; + + if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() { + req.extensions_mut().insert(Protocol::from_inner(protocol)); + } + + let fut = H2Stream::new( + service.call(req), + connect_parts, + respond, + self.date_header, + ); + + exec.execute_h2stream(fut); + } + Some(Err(e)) => { + return Poll::Ready(Err(crate::Error::new_h2(e))); + } + None => { + // no more incoming streams... + if let Some((ref ping, _)) = self.ping { + ping.ensure_not_timed_out()?; + } + + trace!("incoming connection complete"); + return Poll::Ready(Ok(())); + } + } + } + } + + debug_assert!( + self.closing.is_some(), + "poll_server broke loop without closing" + ); + + ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?; + + Poll::Ready(Err(self.closing.take().expect("polled after error"))) + } + + fn poll_ping(&mut self, cx: &mut Context<'_>) { + if let Some((_, ref mut estimator)) = self.ping { + match estimator.poll(cx) { + Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { + self.conn.set_target_window_size(wnd); + let _ = self.conn.set_initial_window_size(wnd); + } + Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { + debug!("keep-alive timed out, closing connection"); + self.conn.abrupt_shutdown(h2::Reason::NO_ERROR); + } + Poll::Pending => {} + } + } + } +} + +pin_project! { + #[allow(missing_debug_implementations)] + pub struct H2Stream<F, B> + where + B: Body, + { + reply: SendResponse<SendBuf<B::Data>>, + #[pin] + state: H2StreamState<F, B>, + date_header: bool, + } +} + +pin_project! { + #[project = H2StreamStateProj] + enum H2StreamState<F, B> + where + B: Body, + { + Service { + #[pin] + fut: F, + connect_parts: Option<ConnectParts>, + }, + Body { + #[pin] + pipe: PipeToSendStream<B>, + }, + } +} + +struct ConnectParts { + pending: Pending, + ping: Recorder, + recv_stream: RecvStream, +} + +impl<F, B> H2Stream<F, B> +where + B: Body, +{ + fn new( + fut: F, + connect_parts: Option<ConnectParts>, + respond: SendResponse<SendBuf<B::Data>>, + date_header: bool, + ) -> H2Stream<F, B> { + H2Stream { + reply: respond, + state: H2StreamState::Service { fut, connect_parts }, + date_header, + } + } +} + +macro_rules! reply { + ($me:expr, $res:expr, $eos:expr) => {{ + match $me.reply.send_response($res, $eos) { + Ok(tx) => tx, + Err(e) => { + debug!("send response error: {}", e); + $me.reply.send_reset(Reason::INTERNAL_ERROR); + return Poll::Ready(Err(crate::Error::new_h2(e))); + } + } + }}; +} + +impl<F, B, E> H2Stream<F, B> +where + F: Future<Output = Result<Response<B>, E>>, + B: Body, + B::Data: 'static, + B::Error: Into<Box<dyn StdError + Send + Sync>>, + E: Into<Box<dyn StdError + Send + Sync>>, +{ + fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { + let mut me = self.project(); + loop { + let next = match me.state.as_mut().project() { + H2StreamStateProj::Service { + fut: h, + connect_parts, + } => { + let res = match h.poll(cx) { + Poll::Ready(Ok(r)) => r, + Poll::Pending => { + // Response is not yet ready, so we want to check if the client has sent a + // RST_STREAM frame which would cancel the current request. + if let Poll::Ready(reason) = + me.reply.poll_reset(cx).map_err(crate::Error::new_h2)? + { + debug!("stream received RST_STREAM: {:?}", reason); + return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); + } + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + let err = crate::Error::new_user_service(e); + warn!("http2 service errored: {}", err); + me.reply.send_reset(err.h2_reason()); + return Poll::Ready(Err(err)); + } + }; + + let (head, body) = res.into_parts(); + let mut res = ::http::Response::from_parts(head, ()); + super::strip_connection_headers(res.headers_mut(), false); + + // set Date header if it isn't already set if instructed + if *me.date_header { + res.headers_mut() + .entry(::http::header::DATE) + .or_insert_with(date::update_and_header_value); + } + + if let Some(connect_parts) = connect_parts.take() { + if res.status().is_success() { + if headers::content_length_parse_all(res.headers()) + .map_or(false, |len| len != 0) + { + warn!("h2 successful response to CONNECT request with body not supported"); + me.reply.send_reset(h2::Reason::INTERNAL_ERROR); + return Poll::Ready(Err(crate::Error::new_user_header())); + } + if res + .headers_mut() + .remove(::http::header::CONTENT_LENGTH) + .is_some() + { + warn!("successful response to CONNECT request disallows content-length header"); + } + let send_stream = reply!(me, res, false); + connect_parts.pending.fulfill(Upgraded::new( + H2Upgraded { + ping: connect_parts.ping, + recv_stream: connect_parts.recv_stream, + send_stream: unsafe { UpgradedSendStream::new(send_stream) }, + buf: Bytes::new(), + }, + Bytes::new(), + )); + return Poll::Ready(Ok(())); + } + } + + if !body.is_end_stream() { + // automatically set Content-Length from body... + if let Some(len) = body.size_hint().exact() { + headers::set_content_length_if_missing(res.headers_mut(), len); + } + + let body_tx = reply!(me, res, false); + H2StreamState::Body { + pipe: PipeToSendStream::new(body, body_tx), + } + } else { + reply!(me, res, true); + return Poll::Ready(Ok(())); + } + } + H2StreamStateProj::Body { pipe } => { + return pipe.poll(cx); + } + }; + me.state.set(next); + } + } +} + +impl<F, B, E> Future for H2Stream<F, B> +where + F: Future<Output = Result<Response<B>, E>>, + B: Body, + B::Data: 'static, + B::Error: Into<Box<dyn StdError + Send + Sync>>, + E: Into<Box<dyn StdError + Send + Sync>>, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.poll2(cx).map(|res| { + if let Err(_e) = res { + debug!("stream error: {}", _e); + } + }) + } +} |
