summaryrefslogtreecommitdiff
path: root/vendor/hyper/src/proto/h2/server.rs
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
committermo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
commit8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch)
tree22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/hyper/src/proto/h2/server.rs
parent4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff)
chore: add vendor directory
Diffstat (limited to 'vendor/hyper/src/proto/h2/server.rs')
-rw-r--r--vendor/hyper/src/proto/h2/server.rs545
1 files changed, 545 insertions, 0 deletions
diff --git a/vendor/hyper/src/proto/h2/server.rs b/vendor/hyper/src/proto/h2/server.rs
new file mode 100644
index 00000000..a8a20dd6
--- /dev/null
+++ b/vendor/hyper/src/proto/h2/server.rs
@@ -0,0 +1,545 @@
+use std::error::Error as StdError;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+use bytes::Bytes;
+use futures_util::ready;
+use h2::server::{Connection, Handshake, SendResponse};
+use h2::{Reason, RecvStream};
+use http::{Method, Request};
+use pin_project_lite::pin_project;
+
+use super::{ping, PipeToSendStream, SendBuf};
+use crate::body::{Body, Incoming as IncomingBody};
+use crate::common::date;
+use crate::common::io::Compat;
+use crate::common::time::Time;
+use crate::ext::Protocol;
+use crate::headers;
+use crate::proto::h2::ping::Recorder;
+use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
+use crate::proto::Dispatched;
+use crate::rt::bounds::Http2ServerConnExec;
+use crate::rt::{Read, Write};
+use crate::service::HttpService;
+
+use crate::upgrade::{OnUpgrade, Pending, Upgraded};
+use crate::Response;
+
+// Our defaults are chosen for the "majority" case, which usually are not
+// resource constrained, and so the spec default of 64kb can be too limiting
+// for performance.
+//
+// At the same time, a server more often has multiple clients connected, and
+// so is more likely to use more resources than a client would.
+const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
+const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
+const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
+const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
+const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
+const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
+
+#[derive(Clone, Debug)]
+pub(crate) struct Config {
+ pub(crate) adaptive_window: bool,
+ pub(crate) initial_conn_window_size: u32,
+ pub(crate) initial_stream_window_size: u32,
+ pub(crate) max_frame_size: u32,
+ pub(crate) enable_connect_protocol: bool,
+ pub(crate) max_concurrent_streams: Option<u32>,
+ pub(crate) max_pending_accept_reset_streams: Option<usize>,
+ pub(crate) max_local_error_reset_streams: Option<usize>,
+ pub(crate) keep_alive_interval: Option<Duration>,
+ pub(crate) keep_alive_timeout: Duration,
+ pub(crate) max_send_buffer_size: usize,
+ pub(crate) max_header_list_size: u32,
+ pub(crate) date_header: bool,
+}
+
+impl Default for Config {
+ fn default() -> Config {
+ Config {
+ adaptive_window: false,
+ initial_conn_window_size: DEFAULT_CONN_WINDOW,
+ initial_stream_window_size: DEFAULT_STREAM_WINDOW,
+ max_frame_size: DEFAULT_MAX_FRAME_SIZE,
+ enable_connect_protocol: false,
+ max_concurrent_streams: Some(200),
+ max_pending_accept_reset_streams: None,
+ max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
+ keep_alive_interval: None,
+ keep_alive_timeout: Duration::from_secs(20),
+ max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
+ max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
+ date_header: true,
+ }
+ }
+}
+
+pin_project! {
+ pub(crate) struct Server<T, S, B, E>
+ where
+ S: HttpService<IncomingBody>,
+ B: Body,
+ {
+ exec: E,
+ timer: Time,
+ service: S,
+ state: State<T, B>,
+ date_header: bool,
+ close_pending: bool
+ }
+}
+
+enum State<T, B>
+where
+ B: Body,
+{
+ Handshaking {
+ ping_config: ping::Config,
+ hs: Handshake<Compat<T>, SendBuf<B::Data>>,
+ },
+ Serving(Serving<T, B>),
+}
+
+struct Serving<T, B>
+where
+ B: Body,
+{
+ ping: Option<(ping::Recorder, ping::Ponger)>,
+ conn: Connection<Compat<T>, SendBuf<B::Data>>,
+ closing: Option<crate::Error>,
+ date_header: bool,
+}
+
+impl<T, S, B, E> Server<T, S, B, E>
+where
+ T: Read + Write + Unpin,
+ S: HttpService<IncomingBody, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ E: Http2ServerConnExec<S::Future, B>,
+{
+ pub(crate) fn new(
+ io: T,
+ service: S,
+ config: &Config,
+ exec: E,
+ timer: Time,
+ ) -> Server<T, S, B, E> {
+ let mut builder = h2::server::Builder::default();
+ builder
+ .initial_window_size(config.initial_stream_window_size)
+ .initial_connection_window_size(config.initial_conn_window_size)
+ .max_frame_size(config.max_frame_size)
+ .max_header_list_size(config.max_header_list_size)
+ .max_local_error_reset_streams(config.max_local_error_reset_streams)
+ .max_send_buffer_size(config.max_send_buffer_size);
+ if let Some(max) = config.max_concurrent_streams {
+ builder.max_concurrent_streams(max);
+ }
+ if let Some(max) = config.max_pending_accept_reset_streams {
+ builder.max_pending_accept_reset_streams(max);
+ }
+ if config.enable_connect_protocol {
+ builder.enable_connect_protocol();
+ }
+ let handshake = builder.handshake(Compat::new(io));
+
+ let bdp = if config.adaptive_window {
+ Some(config.initial_stream_window_size)
+ } else {
+ None
+ };
+
+ let ping_config = ping::Config {
+ bdp_initial_window: bdp,
+ keep_alive_interval: config.keep_alive_interval,
+ keep_alive_timeout: config.keep_alive_timeout,
+ // If keep-alive is enabled for servers, always enabled while
+ // idle, so it can more aggressively close dead connections.
+ keep_alive_while_idle: true,
+ };
+
+ Server {
+ exec,
+ timer,
+ state: State::Handshaking {
+ ping_config,
+ hs: handshake,
+ },
+ service,
+ date_header: config.date_header,
+ close_pending: false,
+ }
+ }
+
+ pub(crate) fn graceful_shutdown(&mut self) {
+ trace!("graceful_shutdown");
+ match self.state {
+ State::Handshaking { .. } => {
+ self.close_pending = true;
+ }
+ State::Serving(ref mut srv) => {
+ if srv.closing.is_none() {
+ srv.conn.graceful_shutdown();
+ }
+ }
+ }
+ }
+}
+
+impl<T, S, B, E> Future for Server<T, S, B, E>
+where
+ T: Read + Write + Unpin,
+ S: HttpService<IncomingBody, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ B: Body + 'static,
+ E: Http2ServerConnExec<S::Future, B>,
+{
+ type Output = crate::Result<Dispatched>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = &mut *self;
+ loop {
+ let next = match me.state {
+ State::Handshaking {
+ ref mut hs,
+ ref ping_config,
+ } => {
+ let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
+ let ping = if ping_config.is_enabled() {
+ let pp = conn.ping_pong().expect("conn.ping_pong");
+ Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
+ } else {
+ None
+ };
+ State::Serving(Serving {
+ ping,
+ conn,
+ closing: None,
+ date_header: me.date_header,
+ })
+ }
+ State::Serving(ref mut srv) => {
+ // graceful_shutdown was called before handshaking finished,
+ if me.close_pending && srv.closing.is_none() {
+ srv.conn.graceful_shutdown();
+ }
+ ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
+ return Poll::Ready(Ok(Dispatched::Shutdown));
+ }
+ };
+ me.state = next;
+ }
+ }
+}
+
+impl<T, B> Serving<T, B>
+where
+ T: Read + Write + Unpin,
+ B: Body + 'static,
+{
+ fn poll_server<S, E>(
+ &mut self,
+ cx: &mut Context<'_>,
+ service: &mut S,
+ exec: &mut E,
+ ) -> Poll<crate::Result<()>>
+ where
+ S: HttpService<IncomingBody, ResBody = B>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: Http2ServerConnExec<S::Future, B>,
+ {
+ if self.closing.is_none() {
+ loop {
+ self.poll_ping(cx);
+
+ match ready!(self.conn.poll_accept(cx)) {
+ Some(Ok((req, mut respond))) => {
+ trace!("incoming request");
+ let content_length = headers::content_length_parse_all(req.headers());
+ let ping = self
+ .ping
+ .as_ref()
+ .map(|ping| ping.0.clone())
+ .unwrap_or_else(ping::disabled);
+
+ // Record the headers received
+ ping.record_non_data();
+
+ let is_connect = req.method() == Method::CONNECT;
+ let (mut parts, stream) = req.into_parts();
+ let (mut req, connect_parts) = if !is_connect {
+ (
+ Request::from_parts(
+ parts,
+ IncomingBody::h2(stream, content_length.into(), ping),
+ ),
+ None,
+ )
+ } else {
+ if content_length.map_or(false, |len| len != 0) {
+ warn!("h2 connect request with non-zero body not supported");
+ respond.send_reset(h2::Reason::INTERNAL_ERROR);
+ return Poll::Ready(Ok(()));
+ }
+ let (pending, upgrade) = crate::upgrade::pending();
+ debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
+ parts.extensions.insert(upgrade);
+ (
+ Request::from_parts(parts, IncomingBody::empty()),
+ Some(ConnectParts {
+ pending,
+ ping,
+ recv_stream: stream,
+ }),
+ )
+ };
+
+ if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
+ req.extensions_mut().insert(Protocol::from_inner(protocol));
+ }
+
+ let fut = H2Stream::new(
+ service.call(req),
+ connect_parts,
+ respond,
+ self.date_header,
+ );
+
+ exec.execute_h2stream(fut);
+ }
+ Some(Err(e)) => {
+ return Poll::Ready(Err(crate::Error::new_h2(e)));
+ }
+ None => {
+ // no more incoming streams...
+ if let Some((ref ping, _)) = self.ping {
+ ping.ensure_not_timed_out()?;
+ }
+
+ trace!("incoming connection complete");
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+ }
+
+ debug_assert!(
+ self.closing.is_some(),
+ "poll_server broke loop without closing"
+ );
+
+ ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
+
+ Poll::Ready(Err(self.closing.take().expect("polled after error")))
+ }
+
+ fn poll_ping(&mut self, cx: &mut Context<'_>) {
+ if let Some((_, ref mut estimator)) = self.ping {
+ match estimator.poll(cx) {
+ Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
+ self.conn.set_target_window_size(wnd);
+ let _ = self.conn.set_initial_window_size(wnd);
+ }
+ Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
+ debug!("keep-alive timed out, closing connection");
+ self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
+ }
+ Poll::Pending => {}
+ }
+ }
+ }
+}
+
+pin_project! {
+ #[allow(missing_debug_implementations)]
+ pub struct H2Stream<F, B>
+ where
+ B: Body,
+ {
+ reply: SendResponse<SendBuf<B::Data>>,
+ #[pin]
+ state: H2StreamState<F, B>,
+ date_header: bool,
+ }
+}
+
+pin_project! {
+ #[project = H2StreamStateProj]
+ enum H2StreamState<F, B>
+ where
+ B: Body,
+ {
+ Service {
+ #[pin]
+ fut: F,
+ connect_parts: Option<ConnectParts>,
+ },
+ Body {
+ #[pin]
+ pipe: PipeToSendStream<B>,
+ },
+ }
+}
+
+struct ConnectParts {
+ pending: Pending,
+ ping: Recorder,
+ recv_stream: RecvStream,
+}
+
+impl<F, B> H2Stream<F, B>
+where
+ B: Body,
+{
+ fn new(
+ fut: F,
+ connect_parts: Option<ConnectParts>,
+ respond: SendResponse<SendBuf<B::Data>>,
+ date_header: bool,
+ ) -> H2Stream<F, B> {
+ H2Stream {
+ reply: respond,
+ state: H2StreamState::Service { fut, connect_parts },
+ date_header,
+ }
+ }
+}
+
+macro_rules! reply {
+ ($me:expr, $res:expr, $eos:expr) => {{
+ match $me.reply.send_response($res, $eos) {
+ Ok(tx) => tx,
+ Err(e) => {
+ debug!("send response error: {}", e);
+ $me.reply.send_reset(Reason::INTERNAL_ERROR);
+ return Poll::Ready(Err(crate::Error::new_h2(e)));
+ }
+ }
+ }};
+}
+
+impl<F, B, E> H2Stream<F, B>
+where
+ F: Future<Output = Result<Response<B>, E>>,
+ B: Body,
+ B::Data: 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: Into<Box<dyn StdError + Send + Sync>>,
+{
+ fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
+ let mut me = self.project();
+ loop {
+ let next = match me.state.as_mut().project() {
+ H2StreamStateProj::Service {
+ fut: h,
+ connect_parts,
+ } => {
+ let res = match h.poll(cx) {
+ Poll::Ready(Ok(r)) => r,
+ Poll::Pending => {
+ // Response is not yet ready, so we want to check if the client has sent a
+ // RST_STREAM frame which would cancel the current request.
+ if let Poll::Ready(reason) =
+ me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
+ {
+ debug!("stream received RST_STREAM: {:?}", reason);
+ return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
+ }
+ return Poll::Pending;
+ }
+ Poll::Ready(Err(e)) => {
+ let err = crate::Error::new_user_service(e);
+ warn!("http2 service errored: {}", err);
+ me.reply.send_reset(err.h2_reason());
+ return Poll::Ready(Err(err));
+ }
+ };
+
+ let (head, body) = res.into_parts();
+ let mut res = ::http::Response::from_parts(head, ());
+ super::strip_connection_headers(res.headers_mut(), false);
+
+ // set Date header if it isn't already set if instructed
+ if *me.date_header {
+ res.headers_mut()
+ .entry(::http::header::DATE)
+ .or_insert_with(date::update_and_header_value);
+ }
+
+ if let Some(connect_parts) = connect_parts.take() {
+ if res.status().is_success() {
+ if headers::content_length_parse_all(res.headers())
+ .map_or(false, |len| len != 0)
+ {
+ warn!("h2 successful response to CONNECT request with body not supported");
+ me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
+ return Poll::Ready(Err(crate::Error::new_user_header()));
+ }
+ if res
+ .headers_mut()
+ .remove(::http::header::CONTENT_LENGTH)
+ .is_some()
+ {
+ warn!("successful response to CONNECT request disallows content-length header");
+ }
+ let send_stream = reply!(me, res, false);
+ connect_parts.pending.fulfill(Upgraded::new(
+ H2Upgraded {
+ ping: connect_parts.ping,
+ recv_stream: connect_parts.recv_stream,
+ send_stream: unsafe { UpgradedSendStream::new(send_stream) },
+ buf: Bytes::new(),
+ },
+ Bytes::new(),
+ ));
+ return Poll::Ready(Ok(()));
+ }
+ }
+
+ if !body.is_end_stream() {
+ // automatically set Content-Length from body...
+ if let Some(len) = body.size_hint().exact() {
+ headers::set_content_length_if_missing(res.headers_mut(), len);
+ }
+
+ let body_tx = reply!(me, res, false);
+ H2StreamState::Body {
+ pipe: PipeToSendStream::new(body, body_tx),
+ }
+ } else {
+ reply!(me, res, true);
+ return Poll::Ready(Ok(()));
+ }
+ }
+ H2StreamStateProj::Body { pipe } => {
+ return pipe.poll(cx);
+ }
+ };
+ me.state.set(next);
+ }
+ }
+}
+
+impl<F, B, E> Future for H2Stream<F, B>
+where
+ F: Future<Output = Result<Response<B>, E>>,
+ B: Body,
+ B::Data: 'static,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ E: Into<Box<dyn StdError + Send + Sync>>,
+{
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.poll2(cx).map(|res| {
+ if let Err(_e) = res {
+ debug!("stream error: {}", _e);
+ }
+ })
+ }
+}