diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
| commit | 8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch) | |
| tree | 22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/hyper/src/proto/h2/client.rs | |
| parent | 4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff) | |
chore: add vendor directory
Diffstat (limited to 'vendor/hyper/src/proto/h2/client.rs')
| -rw-r--r-- | vendor/hyper/src/proto/h2/client.rs | 749 |
1 files changed, 749 insertions, 0 deletions
diff --git a/vendor/hyper/src/proto/h2/client.rs b/vendor/hyper/src/proto/h2/client.rs new file mode 100644 index 00000000..5e9641e4 --- /dev/null +++ b/vendor/hyper/src/proto/h2/client.rs @@ -0,0 +1,749 @@ +use std::{ + convert::Infallible, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use crate::rt::{Read, Write}; +use bytes::Bytes; +use futures_channel::mpsc::{Receiver, Sender}; +use futures_channel::{mpsc, oneshot}; +use futures_util::future::{Either, FusedFuture, FutureExt as _}; +use futures_util::ready; +use futures_util::stream::{StreamExt as _, StreamFuture}; +use h2::client::{Builder, Connection, SendRequest}; +use h2::SendStream; +use http::{Method, StatusCode}; +use pin_project_lite::pin_project; + +use super::ping::{Ponger, Recorder}; +use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; +use crate::body::{Body, Incoming as IncomingBody}; +use crate::client::dispatch::{Callback, SendWhen, TrySendError}; +use crate::common::io::Compat; +use crate::common::time::Time; +use crate::ext::Protocol; +use crate::headers; +use crate::proto::h2::UpgradedSendStream; +use crate::proto::Dispatched; +use crate::rt::bounds::Http2ClientConnExec; +use crate::upgrade::Upgraded; +use crate::{Request, Response}; +use h2::client::ResponseFuture; + +type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>; + +///// An mpsc channel is used to help notify the `Connection` task when *all* +///// other handles to it have been dropped, so that it can shutdown. +type ConnDropRef = mpsc::Sender<Infallible>; + +///// A oneshot channel watches the `Connection` task, and when it completes, +///// the "dispatch" task will be notified and can shutdown sooner. +type ConnEof = oneshot::Receiver<Infallible>; + +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb +const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb +const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb +const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb + +// The maximum number of concurrent streams that the client is allowed to open +// before it receives the initial SETTINGS frame from the server. +// This default value is derived from what the HTTP/2 spec recommends as the +// minimum value that endpoints advertise to their peers. It means that using +// this value will minimize the chance of the failure where the local endpoint +// attempts to open too many streams and gets rejected by the remote peer with +// the `REFUSED_STREAM` error. +const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100; + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) initial_max_send_streams: usize, + pub(crate) max_frame_size: Option<u32>, + pub(crate) max_header_list_size: u32, + pub(crate) keep_alive_interval: Option<Duration>, + pub(crate) keep_alive_timeout: Duration, + pub(crate) keep_alive_while_idle: bool, + pub(crate) max_concurrent_reset_streams: Option<usize>, + pub(crate) max_send_buffer_size: usize, + pub(crate) max_pending_accept_reset_streams: Option<usize>, + pub(crate) header_table_size: Option<u32>, + pub(crate) max_concurrent_streams: Option<u32>, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS, + max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE), + max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE, + keep_alive_interval: None, + keep_alive_timeout: Duration::from_secs(20), + keep_alive_while_idle: false, + max_concurrent_reset_streams: None, + max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, + max_pending_accept_reset_streams: None, + header_table_size: None, + max_concurrent_streams: None, + } + } +} + +fn new_builder(config: &Config) -> Builder { + let mut builder = Builder::default(); + builder + .initial_max_send_streams(config.initial_max_send_streams) + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .max_header_list_size(config.max_header_list_size) + .max_send_buffer_size(config.max_send_buffer_size) + .enable_push(false); + if let Some(max) = config.max_frame_size { + builder.max_frame_size(max); + } + if let Some(max) = config.max_concurrent_reset_streams { + builder.max_concurrent_reset_streams(max); + } + if let Some(max) = config.max_pending_accept_reset_streams { + builder.max_pending_accept_reset_streams(max); + } + if let Some(size) = config.header_table_size { + builder.header_table_size(size); + } + if let Some(max) = config.max_concurrent_streams { + builder.max_concurrent_streams(max); + } + builder +} + +fn new_ping_config(config: &Config) -> ping::Config { + ping::Config { + bdp_initial_window: if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }, + keep_alive_interval: config.keep_alive_interval, + keep_alive_timeout: config.keep_alive_timeout, + keep_alive_while_idle: config.keep_alive_while_idle, + } +} + +pub(crate) async fn handshake<T, B, E>( + io: T, + req_rx: ClientRx<B>, + config: &Config, + mut exec: E, + timer: Time, +) -> crate::Result<ClientTask<B, E, T>> +where + T: Read + Write + Unpin, + B: Body + 'static, + B::Data: Send + 'static, + E: Http2ClientConnExec<B, T> + Unpin, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, +{ + let (h2_tx, mut conn) = new_builder(config) + .handshake::<_, SendBuf<B::Data>>(Compat::new(io)) + .await + .map_err(crate::Error::new_h2)?; + + // An mpsc channel is used entirely to detect when the + // 'Client' has been dropped. This is to get around a bug + // in h2 where dropping all SendRequests won't notify a + // parked Connection. + let (conn_drop_ref, rx) = mpsc::channel(1); + let (cancel_tx, conn_eof) = oneshot::channel(); + + let conn_drop_rx = rx.into_future(); + + let ping_config = new_ping_config(config); + + let (conn, ping) = if ping_config.is_enabled() { + let pp = conn.ping_pong().expect("conn.ping_pong"); + let (recorder, ponger) = ping::channel(pp, ping_config, timer); + + let conn: Conn<_, B> = Conn::new(ponger, conn); + (Either::Left(conn), recorder) + } else { + (Either::Right(conn), ping::disabled()) + }; + let conn: ConnMapErr<T, B> = ConnMapErr { + conn, + is_terminated: false, + }; + + exec.execute_h2_future(H2ClientFuture::Task { + task: ConnTask::new(conn, conn_drop_rx, cancel_tx), + }); + + Ok(ClientTask { + ping, + conn_drop_ref, + conn_eof, + executor: exec, + h2_tx, + req_rx, + fut_ctx: None, + marker: PhantomData, + }) +} + +pin_project! { + struct Conn<T, B> + where + B: Body, + { + #[pin] + ponger: Ponger, + #[pin] + conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>, + } +} + +impl<T, B> Conn<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self { + Conn { ponger, conn } + } +} + +impl<T, B> Future for Conn<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + type Output = Result<(), h2::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + match this.ponger.poll(cx) { + Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => { + this.conn.set_target_window_size(wnd); + this.conn.set_initial_window_size(wnd)?; + } + Poll::Ready(ping::Ponged::KeepAliveTimedOut) => { + debug!("connection keep-alive timed out"); + return Poll::Ready(Ok(())); + } + Poll::Pending => {} + } + + Pin::new(&mut this.conn).poll(cx) + } +} + +pin_project! { + struct ConnMapErr<T, B> + where + B: Body, + T: Read, + T: Write, + T: Unpin, + { + #[pin] + conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>, + #[pin] + is_terminated: bool, + } +} + +impl<T, B> Future for ConnMapErr<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + type Output = Result<(), ()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + if *this.is_terminated { + return Poll::Pending; + } + let polled = this.conn.poll(cx); + if polled.is_ready() { + *this.is_terminated = true; + } + polled.map_err(|_e| { + debug!(error = %_e, "connection error"); + }) + } +} + +impl<T, B> FusedFuture for ConnMapErr<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + fn is_terminated(&self) -> bool { + self.is_terminated + } +} + +pin_project! { + pub struct ConnTask<T, B> + where + B: Body, + T: Read, + T: Write, + T: Unpin, + { + #[pin] + drop_rx: StreamFuture<Receiver<Infallible>>, + #[pin] + cancel_tx: Option<oneshot::Sender<Infallible>>, + #[pin] + conn: ConnMapErr<T, B>, + } +} + +impl<T, B> ConnTask<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + fn new( + conn: ConnMapErr<T, B>, + drop_rx: StreamFuture<Receiver<Infallible>>, + cancel_tx: oneshot::Sender<Infallible>, + ) -> Self { + Self { + drop_rx, + cancel_tx: Some(cancel_tx), + conn, + } + } +} + +impl<T, B> Future for ConnTask<T, B> +where + B: Body, + T: Read + Write + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() { + // ok or err, the `conn` has finished. + return Poll::Ready(()); + } + + if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close. + trace!("send_request dropped, starting conn shutdown"); + drop(this.cancel_tx.take().expect("ConnTask Future polled twice")); + } + + Poll::Pending + } +} + +pin_project! { + #[project = H2ClientFutureProject] + pub enum H2ClientFuture<B, T> + where + B: http_body::Body, + B: 'static, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read, + T: Write, + T: Unpin, + { + Pipe { + #[pin] + pipe: PipeMap<B>, + }, + Send { + #[pin] + send_when: SendWhen<B>, + }, + Task { + #[pin] + task: ConnTask<T, B>, + }, + } +} + +impl<B, T> Future for H2ClientFuture<B, T> +where + B: http_body::Body + 'static, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read + Write + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { + let this = self.project(); + + match this { + H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx), + H2ClientFutureProject::Send { send_when } => send_when.poll(cx), + H2ClientFutureProject::Task { task } => task.poll(cx), + } + } +} + +struct FutCtx<B> +where + B: Body, +{ + is_connect: bool, + eos: bool, + fut: ResponseFuture, + body_tx: SendStream<SendBuf<B::Data>>, + body: B, + cb: Callback<Request<B>, Response<IncomingBody>>, +} + +impl<B: Body> Unpin for FutCtx<B> {} + +pub(crate) struct ClientTask<B, E, T> +where + B: Body, + E: Unpin, +{ + ping: ping::Recorder, + conn_drop_ref: ConnDropRef, + conn_eof: ConnEof, + executor: E, + h2_tx: SendRequest<SendBuf<B::Data>>, + req_rx: ClientRx<B>, + fut_ctx: Option<FutCtx<B>>, + marker: PhantomData<T>, +} + +impl<B, E, T> ClientTask<B, E, T> +where + B: Body + 'static, + E: Http2ClientConnExec<B, T> + Unpin, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read + Write + Unpin, +{ + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.h2_tx.is_extended_connect_protocol_enabled() + } +} + +pin_project! { + pub struct PipeMap<S> + where + S: Body, + { + #[pin] + pipe: PipeToSendStream<S>, + #[pin] + conn_drop_ref: Option<Sender<Infallible>>, + #[pin] + ping: Option<Recorder>, + } +} + +impl<B> Future for PipeMap<B> +where + B: http_body::Body, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { + let mut this = self.project(); + + match this.pipe.poll_unpin(cx) { + Poll::Ready(result) => { + if let Err(_e) = result { + debug!("client request body error: {}", _e); + } + drop(this.conn_drop_ref.take().expect("Future polled twice")); + drop(this.ping.take().expect("Future polled twice")); + return Poll::Ready(()); + } + Poll::Pending => (), + }; + Poll::Pending + } +} + +impl<B, E, T> ClientTask<B, E, T> +where + B: Body + 'static + Unpin, + B::Data: Send, + E: Http2ClientConnExec<B, T> + Unpin, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + T: Read + Write + Unpin, +{ + fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) { + let ping = self.ping.clone(); + + let send_stream = if !f.is_connect { + if !f.eos { + let mut pipe = PipeToSendStream::new(f.body, f.body_tx); + + // eagerly see if the body pipe is ready and + // can thus skip allocating in the executor + match Pin::new(&mut pipe).poll(cx) { + Poll::Ready(_) => (), + Poll::Pending => { + let conn_drop_ref = self.conn_drop_ref.clone(); + // keep the ping recorder's knowledge of an + // "open stream" alive while this body is + // still sending... + let ping = ping.clone(); + + let pipe = PipeMap { + pipe, + conn_drop_ref: Some(conn_drop_ref), + ping: Some(ping), + }; + // Clear send task + self.executor + .execute_h2_future(H2ClientFuture::Pipe { pipe }); + } + } + } + + None + } else { + Some(f.body_tx) + }; + + self.executor.execute_h2_future(H2ClientFuture::Send { + send_when: SendWhen { + when: ResponseFutMap { + fut: f.fut, + ping: Some(ping), + send_stream: Some(send_stream), + }, + call_back: Some(f.cb), + }, + }); + } +} + +pin_project! { + pub(crate) struct ResponseFutMap<B> + where + B: Body, + B: 'static, + { + #[pin] + fut: ResponseFuture, + #[pin] + ping: Option<Recorder>, + #[pin] + send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>, + } +} + +impl<B> Future for ResponseFutMap<B> +where + B: Body + 'static, +{ + type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + + let result = ready!(this.fut.poll(cx)); + + let ping = this.ping.take().expect("Future polled twice"); + let send_stream = this.send_stream.take().expect("Future polled twice"); + + match result { + Ok(res) => { + // record that we got the response headers + ping.record_non_data(); + + let content_length = headers::content_length_parse_all(res.headers()); + if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) { + if content_length.map_or(false, |len| len != 0) { + warn!("h2 connect response with non-zero body not supported"); + + send_stream.send_reset(h2::Reason::INTERNAL_ERROR); + return Poll::Ready(Err(( + crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + None::<Request<B>>, + ))); + } + let (parts, recv_stream) = res.into_parts(); + let mut res = Response::from_parts(parts, IncomingBody::empty()); + + let (pending, on_upgrade) = crate::upgrade::pending(); + let io = H2Upgraded { + ping, + send_stream: unsafe { UpgradedSendStream::new(send_stream) }, + recv_stream, + buf: Bytes::new(), + }; + let upgraded = Upgraded::new(io, Bytes::new()); + + pending.fulfill(upgraded); + res.extensions_mut().insert(on_upgrade); + + Poll::Ready(Ok(res)) + } else { + let res = res.map(|stream| { + let ping = ping.for_stream(&stream); + IncomingBody::h2(stream, content_length.into(), ping) + }); + Poll::Ready(Ok(res)) + } + } + Err(err) => { + ping.ensure_not_timed_out().map_err(|e| (e, None))?; + + debug!("client response error: {}", err); + Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>))) + } + } + } +} + +impl<B, E, T> Future for ClientTask<B, E, T> +where + B: Body + 'static + Unpin, + B::Data: Send, + B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, + E: Http2ClientConnExec<B, T> + Unpin, + T: Read + Write + Unpin, +{ + type Output = crate::Result<Dispatched>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + match ready!(self.h2_tx.poll_ready(cx)) { + Ok(()) => (), + Err(err) => { + self.ping.ensure_not_timed_out()?; + return if err.reason() == Some(::h2::Reason::NO_ERROR) { + trace!("connection gracefully shutdown"); + Poll::Ready(Ok(Dispatched::Shutdown)) + } else { + Poll::Ready(Err(crate::Error::new_h2(err))) + }; + } + }; + + // If we were waiting on pending open + // continue where we left off. + if let Some(f) = self.fut_ctx.take() { + self.poll_pipe(f, cx); + continue; + } + + match self.req_rx.poll_recv(cx) { + Poll::Ready(Some((req, cb))) => { + // check that future hasn't been canceled already + if cb.is_canceled() { + trace!("request callback is canceled"); + continue; + } + let (head, body) = req.into_parts(); + let mut req = ::http::Request::from_parts(head, ()); + super::strip_connection_headers(req.headers_mut(), true); + if let Some(len) = body.size_hint().exact() { + if len != 0 || headers::method_has_defined_payload_semantics(req.method()) { + headers::set_content_length_if_missing(req.headers_mut(), len); + } + } + + let is_connect = req.method() == Method::CONNECT; + let eos = body.is_end_stream(); + + if is_connect + && headers::content_length_parse_all(req.headers()) + .map_or(false, |len| len != 0) + { + warn!("h2 connect request with non-zero body not supported"); + cb.send(Err(TrySendError { + error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + message: None, + })); + continue; + } + + if let Some(protocol) = req.extensions_mut().remove::<Protocol>() { + req.extensions_mut().insert(protocol.into_inner()); + } + + let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { + Ok(ok) => ok, + Err(err) => { + debug!("client send request error: {}", err); + cb.send(Err(TrySendError { + error: crate::Error::new_h2(err), + message: None, + })); + continue; + } + }; + + let f = FutCtx { + is_connect, + eos, + fut, + body_tx, + body, + cb, + }; + + // Check poll_ready() again. + // If the call to send_request() resulted in the new stream being pending open + // we have to wait for the open to complete before accepting new requests. + match self.h2_tx.poll_ready(cx) { + Poll::Pending => { + // Save Context + self.fut_ctx = Some(f); + return Poll::Pending; + } + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(err)) => { + f.cb.send(Err(TrySendError { + error: crate::Error::new_h2(err), + message: None, + })); + continue; + } + } + self.poll_pipe(f, cx); + continue; + } + + Poll::Ready(None) => { + trace!("client::dispatch::Sender dropped"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + + Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { + // As of Rust 1.82, this pattern is no longer needed, and emits a warning. + // But we cannot remove it as long as MSRV is less than that. + #[allow(unused)] + Ok(never) => match never {}, + Err(_conn_is_eof) => { + trace!("connection task is closed, closing dispatch task"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } + }, + } + } + } +} |
