summaryrefslogtreecommitdiff
path: root/vendor/hyper/src/common/io
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/hyper/src/common/io')
-rw-r--r--vendor/hyper/src/common/io/compat.rs150
-rw-r--r--vendor/hyper/src/common/io/mod.rs7
-rw-r--r--vendor/hyper/src/common/io/rewind.rs160
3 files changed, 317 insertions, 0 deletions
diff --git a/vendor/hyper/src/common/io/compat.rs b/vendor/hyper/src/common/io/compat.rs
new file mode 100644
index 00000000..d026b6d3
--- /dev/null
+++ b/vendor/hyper/src/common/io/compat.rs
@@ -0,0 +1,150 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// This adapts from `hyper` IO traits to the ones in Tokio.
+///
+/// This is currently used by `h2`, and by hyper internal unit tests.
+#[derive(Debug)]
+pub(crate) struct Compat<T>(pub(crate) T);
+
+impl<T> Compat<T> {
+ pub(crate) fn new(io: T) -> Self {
+ Compat(io)
+ }
+
+ fn p(self: Pin<&mut Self>) -> Pin<&mut T> {
+ // SAFETY: The simplest of projections. This is just
+ // a wrapper, we don't do anything that would undo the projection.
+ unsafe { self.map_unchecked_mut(|me| &mut me.0) }
+ }
+}
+
+impl<T> tokio::io::AsyncRead for Compat<T>
+where
+ T: crate::rt::Read,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ tbuf: &mut tokio::io::ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let init = tbuf.initialized().len();
+ let filled = tbuf.filled().len();
+ let (new_init, new_filled) = unsafe {
+ let mut buf = crate::rt::ReadBuf::uninit(tbuf.inner_mut());
+ buf.set_init(init);
+ buf.set_filled(filled);
+
+ match crate::rt::Read::poll_read(self.p(), cx, buf.unfilled()) {
+ Poll::Ready(Ok(())) => (buf.init_len(), buf.len()),
+ other => return other,
+ }
+ };
+
+ let n_init = new_init - init;
+ unsafe {
+ tbuf.assume_init(n_init);
+ tbuf.set_filled(new_filled);
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<T> tokio::io::AsyncWrite for Compat<T>
+where
+ T: crate::rt::Write,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ crate::rt::Write::poll_write(self.p(), cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
+ crate::rt::Write::poll_flush(self.p(), cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ crate::rt::Write::poll_shutdown(self.p(), cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ crate::rt::Write::is_write_vectored(&self.0)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ crate::rt::Write::poll_write_vectored(self.p(), cx, bufs)
+ }
+}
+
+#[cfg(test)]
+impl<T> crate::rt::Read for Compat<T>
+where
+ T: tokio::io::AsyncRead,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut buf: crate::rt::ReadBufCursor<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let n = unsafe {
+ let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
+ match tokio::io::AsyncRead::poll_read(self.p(), cx, &mut tbuf) {
+ Poll::Ready(Ok(())) => tbuf.filled().len(),
+ other => return other,
+ }
+ };
+
+ unsafe {
+ buf.advance(n);
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+#[cfg(test)]
+impl<T> crate::rt::Write for Compat<T>
+where
+ T: tokio::io::AsyncWrite,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ tokio::io::AsyncWrite::poll_write(self.p(), cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
+ tokio::io::AsyncWrite::poll_flush(self.p(), cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ tokio::io::AsyncWrite::poll_shutdown(self.p(), cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ tokio::io::AsyncWrite::is_write_vectored(&self.0)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ tokio::io::AsyncWrite::poll_write_vectored(self.p(), cx, bufs)
+ }
+}
diff --git a/vendor/hyper/src/common/io/mod.rs b/vendor/hyper/src/common/io/mod.rs
new file mode 100644
index 00000000..98c297ca
--- /dev/null
+++ b/vendor/hyper/src/common/io/mod.rs
@@ -0,0 +1,7 @@
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
+mod compat;
+mod rewind;
+
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
+pub(crate) use self::compat::Compat;
+pub(crate) use self::rewind::Rewind;
diff --git a/vendor/hyper/src/common/io/rewind.rs b/vendor/hyper/src/common/io/rewind.rs
new file mode 100644
index 00000000..c2556f01
--- /dev/null
+++ b/vendor/hyper/src/common/io/rewind.rs
@@ -0,0 +1,160 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{cmp, io};
+
+use bytes::{Buf, Bytes};
+
+use crate::rt::{Read, ReadBufCursor, Write};
+
+/// Combine a buffer with an IO, rewinding reads to use the buffer.
+#[derive(Debug)]
+pub(crate) struct Rewind<T> {
+ pre: Option<Bytes>,
+ inner: T,
+}
+
+impl<T> Rewind<T> {
+ #[cfg(test)]
+ pub(crate) fn new(io: T) -> Self {
+ Rewind {
+ pre: None,
+ inner: io,
+ }
+ }
+
+ pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self {
+ Rewind {
+ pre: Some(buf),
+ inner: io,
+ }
+ }
+
+ #[cfg(test)]
+ pub(crate) fn rewind(&mut self, bs: Bytes) {
+ debug_assert!(self.pre.is_none());
+ self.pre = Some(bs);
+ }
+
+ pub(crate) fn into_inner(self) -> (T, Bytes) {
+ (self.inner, self.pre.unwrap_or_default())
+ }
+
+ // pub(crate) fn get_mut(&mut self) -> &mut T {
+ // &mut self.inner
+ // }
+}
+
+impl<T> Read for Rewind<T>
+where
+ T: Read + Unpin,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut buf: ReadBufCursor<'_>,
+ ) -> Poll<io::Result<()>> {
+ if let Some(mut prefix) = self.pre.take() {
+ // If there are no remaining bytes, let the bytes get dropped.
+ if !prefix.is_empty() {
+ let copy_len = cmp::min(prefix.len(), buf.remaining());
+ // TODO: There should be a way to do following two lines cleaner...
+ buf.put_slice(&prefix[..copy_len]);
+ prefix.advance(copy_len);
+ // Put back what's left
+ if !prefix.is_empty() {
+ self.pre = Some(prefix);
+ }
+
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+}
+
+impl<T> Write for Rewind<T>
+where
+ T: Write + Unpin,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.inner.is_write_vectored()
+ }
+}
+
+#[cfg(all(
+ any(feature = "client", feature = "server"),
+ any(feature = "http1", feature = "http2"),
+))]
+#[cfg(test)]
+mod tests {
+ use super::super::Compat;
+ use super::Rewind;
+ use bytes::Bytes;
+ use tokio::io::AsyncReadExt;
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn partial_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Compat::new(Rewind::new(Compat::new(mock)));
+
+ // Read off some bytes, ensure we filled o1
+ let mut buf = [0; 2];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.0.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // At this point we should have read everything that was in the MockStream
+ assert_eq!(&buf, &underlying);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn full_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Compat::new(Rewind::new(Compat::new(mock)));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.0.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+ }
+}