diff options
Diffstat (limited to 'vendor/hyper-util/src/rt')
| -rw-r--r-- | vendor/hyper-util/src/rt/io.rs | 34 | ||||
| -rw-r--r-- | vendor/hyper-util/src/rt/mod.rs | 12 | ||||
| -rw-r--r-- | vendor/hyper-util/src/rt/tokio.rs | 339 | ||||
| -rw-r--r-- | vendor/hyper-util/src/rt/tokio/with_hyper_io.rs | 170 | ||||
| -rw-r--r-- | vendor/hyper-util/src/rt/tokio/with_tokio_io.rs | 178 |
5 files changed, 733 insertions, 0 deletions
diff --git a/vendor/hyper-util/src/rt/io.rs b/vendor/hyper-util/src/rt/io.rs new file mode 100644 index 00000000..888756f6 --- /dev/null +++ b/vendor/hyper-util/src/rt/io.rs @@ -0,0 +1,34 @@ +use std::marker::Unpin; +use std::pin::Pin; +use std::task::Poll; + +use futures_core::ready; +use hyper::rt::{Read, ReadBuf, Write}; + +use crate::common::future::poll_fn; + +pub(crate) async fn read<T>(io: &mut T, buf: &mut [u8]) -> Result<usize, std::io::Error> +where + T: Read + Unpin, +{ + poll_fn(move |cx| { + let mut buf = ReadBuf::new(buf); + ready!(Pin::new(&mut *io).poll_read(cx, buf.unfilled()))?; + Poll::Ready(Ok(buf.filled().len())) + }) + .await +} + +pub(crate) async fn write_all<T>(io: &mut T, buf: &[u8]) -> Result<(), std::io::Error> +where + T: Write + Unpin, +{ + let mut n = 0; + poll_fn(move |cx| { + while n < buf.len() { + n += ready!(Pin::new(&mut *io).poll_write(cx, &buf[n..])?); + } + Poll::Ready(Ok(())) + }) + .await +} diff --git a/vendor/hyper-util/src/rt/mod.rs b/vendor/hyper-util/src/rt/mod.rs new file mode 100644 index 00000000..71363ccd --- /dev/null +++ b/vendor/hyper-util/src/rt/mod.rs @@ -0,0 +1,12 @@ +//! Runtime utilities + +#[cfg(feature = "client-legacy")] +mod io; +#[cfg(feature = "client-legacy")] +pub(crate) use self::io::{read, write_all}; + +#[cfg(feature = "tokio")] +pub mod tokio; + +#[cfg(feature = "tokio")] +pub use self::tokio::{TokioExecutor, TokioIo, TokioTimer}; diff --git a/vendor/hyper-util/src/rt/tokio.rs b/vendor/hyper-util/src/rt/tokio.rs new file mode 100644 index 00000000..46ffeba8 --- /dev/null +++ b/vendor/hyper-util/src/rt/tokio.rs @@ -0,0 +1,339 @@ +//! [`tokio`] runtime components integration for [`hyper`]. +//! +//! [`hyper::rt`] exposes a set of traits to allow hyper to be agnostic to +//! its underlying asynchronous runtime. This submodule provides glue for +//! [`tokio`] users to bridge those types to [`hyper`]'s interfaces. +//! +//! # IO +//! +//! [`hyper`] abstracts over asynchronous readers and writers using [`Read`] +//! and [`Write`], while [`tokio`] abstracts over this using [`AsyncRead`] +//! and [`AsyncWrite`]. This submodule provides a collection of IO adaptors +//! to bridge these two IO ecosystems together: [`TokioIo<I>`], +//! [`WithHyperIo<I>`], and [`WithTokioIo<I>`]. +//! +//! To compare and constrast these IO adaptors and to help explain which +//! is the proper choice for your needs, here is a table showing which IO +//! traits these implement, given two types `T` and `H` which implement +//! Tokio's and Hyper's corresponding IO traits: +//! +//! | | [`AsyncRead`] | [`AsyncWrite`] | [`Read`] | [`Write`] | +//! |--------------------|------------------|-------------------|--------------|--------------| +//! | `T` | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | +//! | `H` | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | +//! | [`TokioIo<T>`] | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | +//! | [`TokioIo<H>`] | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | +//! | [`WithHyperIo<T>`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | +//! | [`WithHyperIo<H>`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | +//! | [`WithTokioIo<T>`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | +//! | [`WithTokioIo<H>`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | +//! +//! For most situations, [`TokioIo<I>`] is the proper choice. This should be +//! constructed, wrapping some underlying [`hyper`] or [`tokio`] IO, at the +//! call-site of a function like [`hyper::client::conn::http1::handshake`]. +//! +//! [`TokioIo<I>`] switches across these ecosystems, but notably does not +//! preserve the existing IO trait implementations of its underlying IO. If +//! one wishes to _extend_ IO with additional implementations, +//! [`WithHyperIo<I>`] and [`WithTokioIo<I>`] are the correct choice. +//! +//! For example, a Tokio reader/writer can be wrapped in [`WithHyperIo<I>`]. +//! That will implement _both_ sets of IO traits. Conversely, +//! [`WithTokioIo<I>`] will implement both sets of IO traits given a +//! reader/writer that implements Hyper's [`Read`] and [`Write`]. +//! +//! See [`tokio::io`] and ["_Asynchronous IO_"][tokio-async-docs] for more +//! information. +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite +//! [`Read`]: hyper::rt::Read +//! [`Write`]: hyper::rt::Write +//! [tokio-async-docs]: https://docs.rs/tokio/latest/tokio/#asynchronous-io + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use hyper::rt::{Executor, Sleep, Timer}; +use pin_project_lite::pin_project; + +#[cfg(feature = "tracing")] +use tracing::instrument::Instrument; + +pub use self::{with_hyper_io::WithHyperIo, with_tokio_io::WithTokioIo}; + +mod with_hyper_io; +mod with_tokio_io; + +/// Future executor that utilises `tokio` threads. +#[non_exhaustive] +#[derive(Default, Debug, Clone)] +pub struct TokioExecutor {} + +pin_project! { + /// A wrapper that implements Tokio's IO traits for an inner type that + /// implements hyper's IO traits, or vice versa (implements hyper's IO + /// traits for a type that implements Tokio's IO traits). + #[derive(Debug)] + pub struct TokioIo<T> { + #[pin] + inner: T, + } +} + +/// A Timer that uses the tokio runtime. +#[non_exhaustive] +#[derive(Default, Clone, Debug)] +pub struct TokioTimer; + +// Use TokioSleep to get tokio::time::Sleep to implement Unpin. +// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html +pin_project! { + #[derive(Debug)] + struct TokioSleep { + #[pin] + inner: tokio::time::Sleep, + } +} + +// ===== impl TokioExecutor ===== + +impl<Fut> Executor<Fut> for TokioExecutor +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + #[cfg(feature = "tracing")] + tokio::spawn(fut.in_current_span()); + + #[cfg(not(feature = "tracing"))] + tokio::spawn(fut); + } +} + +impl TokioExecutor { + /// Create new executor that relies on [`tokio::spawn`] to execute futures. + pub fn new() -> Self { + Self {} + } +} + +// ==== impl TokioIo ===== + +impl<T> TokioIo<T> { + /// Wrap a type implementing Tokio's or hyper's IO traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Mut borrow the inner type. + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<T> hyper::rt::Read for TokioIo<T> +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll<Result<(), std::io::Error>> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl<T> hyper::rt::Write for TokioIo<T> +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl<T> tokio::io::AsyncRead for TokioIo<T> +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<Result<(), std::io::Error>> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl<T> tokio::io::AsyncWrite for TokioIo<T> +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +// ==== impl TokioTimer ===== + +impl Timer for TokioTimer { + fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> { + Box::pin(TokioSleep { + inner: tokio::time::sleep(duration), + }) + } + + fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> { + Box::pin(TokioSleep { + inner: tokio::time::sleep_until(deadline.into()), + }) + } + + fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() { + sleep.reset(new_deadline) + } + } +} + +impl TokioTimer { + /// Create a new TokioTimer + pub fn new() -> Self { + Self {} + } +} + +impl Future for TokioSleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.project().inner.poll(cx) + } +} + +impl Sleep for TokioSleep {} + +impl TokioSleep { + fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.project().inner.as_mut().reset(deadline.into()); + } +} + +#[cfg(test)] +mod tests { + use crate::rt::TokioExecutor; + use hyper::rt::Executor; + use tokio::sync::oneshot; + + #[cfg(not(miri))] + #[tokio::test] + async fn simple_execute() -> Result<(), Box<dyn std::error::Error>> { + let (tx, rx) = oneshot::channel(); + let executor = TokioExecutor::new(); + executor.execute(async move { + tx.send(()).unwrap(); + }); + rx.await.map_err(Into::into) + } +} diff --git a/vendor/hyper-util/src/rt/tokio/with_hyper_io.rs b/vendor/hyper-util/src/rt/tokio/with_hyper_io.rs new file mode 100644 index 00000000..9c5072d4 --- /dev/null +++ b/vendor/hyper-util/src/rt/tokio/with_hyper_io.rs @@ -0,0 +1,170 @@ +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Extends an underlying [`tokio`] I/O with [`hyper`] I/O implementations. + /// + /// This implements [`Read`] and [`Write`] given an inner type that implements [`AsyncRead`] + /// and [`AsyncWrite`], respectively. + #[derive(Debug)] + pub struct WithHyperIo<I> { + #[pin] + inner: I, + } +} + +// ==== impl WithHyperIo ===== + +impl<I> WithHyperIo<I> { + /// Wraps the inner I/O in an [`WithHyperIo<I>`] + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Returns a reference to the inner type. + pub fn inner(&self) -> &I { + &self.inner + } + + /// Returns a mutable reference to the inner type. + pub fn inner_mut(&mut self) -> &mut I { + &mut self.inner + } + + /// Consumes this wrapper and returns the inner type. + pub fn into_inner(self) -> I { + self.inner + } +} + +/// [`WithHyperIo<I>`] is [`Read`] if `I` is [`AsyncRead`]. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +/// [`Read`]: hyper::rt::Read +impl<I> hyper::rt::Read for WithHyperIo<I> +where + I: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll<Result<(), std::io::Error>> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +/// [`WithHyperIo<I>`] is [`Write`] if `I` is [`AsyncWrite`]. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +/// [`Write`]: hyper::rt::Write +impl<I> hyper::rt::Write for WithHyperIo<I> +where + I: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// [`WithHyperIo<I>`] exposes its inner `I`'s [`AsyncRead`] implementation. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +impl<I> tokio::io::AsyncRead for WithHyperIo<I> +where + I: tokio::io::AsyncRead, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<Result<(), std::io::Error>> { + self.project().inner.poll_read(cx, buf) + } +} + +/// [`WithHyperIo<I>`] exposes its inner `I`'s [`AsyncWrite`] implementation. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +impl<I> tokio::io::AsyncWrite for WithHyperIo<I> +where + I: tokio::io::AsyncWrite, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + self.project().inner.poll_write(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + self.project().inner.poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + self.project().inner.poll_shutdown(cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} diff --git a/vendor/hyper-util/src/rt/tokio/with_tokio_io.rs b/vendor/hyper-util/src/rt/tokio/with_tokio_io.rs new file mode 100644 index 00000000..223e0ed3 --- /dev/null +++ b/vendor/hyper-util/src/rt/tokio/with_tokio_io.rs @@ -0,0 +1,178 @@ +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Extends an underlying [`hyper`] I/O with [`tokio`] I/O implementations. + /// + /// This implements [`AsyncRead`] and [`AsyncWrite`] given an inner type that implements + /// [`Read`] and [`Write`], respectively. + #[derive(Debug)] + pub struct WithTokioIo<I> { + #[pin] + inner: I, + } +} + +// ==== impl WithTokioIo ===== + +/// [`WithTokioIo<I>`] is [`AsyncRead`] if `I` is [`Read`]. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +/// [`Read`]: hyper::rt::Read +impl<I> tokio::io::AsyncRead for WithTokioIo<I> +where + I: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<Result<(), std::io::Error>> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +/// [`WithTokioIo<I>`] is [`AsyncWrite`] if `I` is [`Write`]. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +/// [`Write`]: hyper::rt::Write +impl<I> tokio::io::AsyncWrite for WithTokioIo<I> +where + I: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// [`WithTokioIo<I>`] exposes its inner `I`'s [`Write`] implementation. +/// +/// [`Write`]: hyper::rt::Write +impl<I> hyper::rt::Write for WithTokioIo<I> +where + I: hyper::rt::Write, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + self.project().inner.poll_write(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + self.project().inner.poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + self.project().inner.poll_shutdown(cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} + +impl<I> WithTokioIo<I> { + /// Wraps the inner I/O in an [`WithTokioIo<I>`] + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Returns a reference to the inner type. + pub fn inner(&self) -> &I { + &self.inner + } + + /// Returns a mutable reference to the inner type. + pub fn inner_mut(&mut self) -> &mut I { + &mut self.inner + } + + /// Consumes this wrapper and returns the inner type. + pub fn into_inner(self) -> I { + self.inner + } +} + +/// [`WithTokioIo<I>`] exposes its inner `I`'s [`Read`] implementation. +/// +/// [`Read`]: hyper::rt::Read +impl<I> hyper::rt::Read for WithTokioIo<I> +where + I: hyper::rt::Read, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll<Result<(), std::io::Error>> { + self.project().inner.poll_read(cx, buf) + } +} |
