summaryrefslogtreecommitdiff
path: root/vendor/hyper/src/common
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/hyper/src/common')
-rw-r--r--vendor/hyper/src/common/buf.rs150
-rw-r--r--vendor/hyper/src/common/date.rs138
-rw-r--r--vendor/hyper/src/common/io/compat.rs150
-rw-r--r--vendor/hyper/src/common/io/mod.rs7
-rw-r--r--vendor/hyper/src/common/io/rewind.rs160
-rw-r--r--vendor/hyper/src/common/mod.rs14
-rw-r--r--vendor/hyper/src/common/task.rs9
-rw-r--r--vendor/hyper/src/common/time.rs79
-rw-r--r--vendor/hyper/src/common/watch.rs73
9 files changed, 780 insertions, 0 deletions
diff --git a/vendor/hyper/src/common/buf.rs b/vendor/hyper/src/common/buf.rs
new file mode 100644
index 00000000..d0007155
--- /dev/null
+++ b/vendor/hyper/src/common/buf.rs
@@ -0,0 +1,150 @@
+use std::collections::VecDeque;
+use std::io::IoSlice;
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+
+pub(crate) struct BufList<T> {
+ bufs: VecDeque<T>,
+}
+
+impl<T: Buf> BufList<T> {
+ pub(crate) fn new() -> BufList<T> {
+ BufList {
+ bufs: VecDeque::new(),
+ }
+ }
+
+ #[inline]
+ pub(crate) fn push(&mut self, buf: T) {
+ debug_assert!(buf.has_remaining());
+ self.bufs.push_back(buf);
+ }
+
+ #[inline]
+ pub(crate) fn bufs_cnt(&self) -> usize {
+ self.bufs.len()
+ }
+}
+
+impl<T: Buf> Buf for BufList<T> {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.bufs.iter().map(|buf| buf.remaining()).sum()
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ self.bufs.front().map(Buf::chunk).unwrap_or_default()
+ }
+
+ #[inline]
+ fn advance(&mut self, mut cnt: usize) {
+ while cnt > 0 {
+ {
+ let front = &mut self.bufs[0];
+ let rem = front.remaining();
+ if rem > cnt {
+ front.advance(cnt);
+ return;
+ } else {
+ front.advance(rem);
+ cnt -= rem;
+ }
+ }
+ self.bufs.pop_front();
+ }
+ }
+
+ #[inline]
+ fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
+ if dst.is_empty() {
+ return 0;
+ }
+ let mut vecs = 0;
+ for buf in &self.bufs {
+ vecs += buf.chunks_vectored(&mut dst[vecs..]);
+ if vecs == dst.len() {
+ break;
+ }
+ }
+ vecs
+ }
+
+ #[inline]
+ fn copy_to_bytes(&mut self, len: usize) -> Bytes {
+ // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
+ // request can be fulfilled by the front buffer, we can take advantage.
+ match self.bufs.front_mut() {
+ Some(front) if front.remaining() == len => {
+ let b = front.copy_to_bytes(len);
+ self.bufs.pop_front();
+ b
+ }
+ Some(front) if front.remaining() > len => front.copy_to_bytes(len),
+ _ => {
+ assert!(len <= self.remaining(), "`len` greater than remaining");
+ let mut bm = BytesMut::with_capacity(len);
+ bm.put(self.take(len));
+ bm.freeze()
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::ptr;
+
+ use super::*;
+
+ fn hello_world_buf() -> BufList<Bytes> {
+ BufList {
+ bufs: vec![Bytes::from("Hello"), Bytes::from(" "), Bytes::from("World")].into(),
+ }
+ }
+
+ #[test]
+ fn to_bytes_shorter() {
+ let mut bufs = hello_world_buf();
+ let old_ptr = bufs.chunk().as_ptr();
+ let start = bufs.copy_to_bytes(4);
+ assert_eq!(start, "Hell");
+ assert!(ptr::eq(old_ptr, start.as_ptr()));
+ assert_eq!(bufs.chunk(), b"o");
+ assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr()));
+ assert_eq!(bufs.remaining(), 7);
+ }
+
+ #[test]
+ fn to_bytes_eq() {
+ let mut bufs = hello_world_buf();
+ let old_ptr = bufs.chunk().as_ptr();
+ let start = bufs.copy_to_bytes(5);
+ assert_eq!(start, "Hello");
+ assert!(ptr::eq(old_ptr, start.as_ptr()));
+ assert_eq!(bufs.chunk(), b" ");
+ assert_eq!(bufs.remaining(), 6);
+ }
+
+ #[test]
+ fn to_bytes_longer() {
+ let mut bufs = hello_world_buf();
+ let start = bufs.copy_to_bytes(7);
+ assert_eq!(start, "Hello W");
+ assert_eq!(bufs.remaining(), 4);
+ }
+
+ #[test]
+ fn one_long_buf_to_bytes() {
+ let mut buf = BufList::new();
+ buf.push(b"Hello World" as &[_]);
+ assert_eq!(buf.copy_to_bytes(5), "Hello");
+ assert_eq!(buf.chunk(), b" World");
+ }
+
+ #[test]
+ #[should_panic(expected = "`len` greater than remaining")]
+ fn buf_to_bytes_too_many() {
+ hello_world_buf().copy_to_bytes(42);
+ }
+}
diff --git a/vendor/hyper/src/common/date.rs b/vendor/hyper/src/common/date.rs
new file mode 100644
index 00000000..6eae6746
--- /dev/null
+++ b/vendor/hyper/src/common/date.rs
@@ -0,0 +1,138 @@
+use std::cell::RefCell;
+use std::fmt::{self, Write};
+use std::str;
+use std::time::{Duration, SystemTime};
+
+#[cfg(feature = "http2")]
+use http::header::HeaderValue;
+use httpdate::HttpDate;
+
+// "Sun, 06 Nov 1994 08:49:37 GMT".len()
+pub(crate) const DATE_VALUE_LENGTH: usize = 29;
+
+#[cfg(feature = "http1")]
+pub(crate) fn extend(dst: &mut Vec<u8>) {
+ CACHED.with(|cache| {
+ dst.extend_from_slice(cache.borrow().buffer());
+ })
+}
+
+#[cfg(feature = "http1")]
+pub(crate) fn update() {
+ CACHED.with(|cache| {
+ cache.borrow_mut().check();
+ })
+}
+
+#[cfg(feature = "http2")]
+pub(crate) fn update_and_header_value() -> HeaderValue {
+ CACHED.with(|cache| {
+ let mut cache = cache.borrow_mut();
+ cache.check();
+ cache.header_value.clone()
+ })
+}
+
+struct CachedDate {
+ bytes: [u8; DATE_VALUE_LENGTH],
+ pos: usize,
+ #[cfg(feature = "http2")]
+ header_value: HeaderValue,
+ next_update: SystemTime,
+}
+
+thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate::new()));
+
+impl CachedDate {
+ fn new() -> Self {
+ let mut cache = CachedDate {
+ bytes: [0; DATE_VALUE_LENGTH],
+ pos: 0,
+ #[cfg(feature = "http2")]
+ header_value: HeaderValue::from_static(""),
+ next_update: SystemTime::now(),
+ };
+ cache.update(cache.next_update);
+ cache
+ }
+
+ fn buffer(&self) -> &[u8] {
+ &self.bytes[..]
+ }
+
+ fn check(&mut self) {
+ let now = SystemTime::now();
+ if now > self.next_update {
+ self.update(now);
+ }
+ }
+
+ fn update(&mut self, now: SystemTime) {
+ self.render(now);
+ self.next_update = now + Duration::new(1, 0);
+ }
+
+ fn render(&mut self, now: SystemTime) {
+ self.pos = 0;
+ let _ = write!(self, "{}", HttpDate::from(now));
+ debug_assert!(self.pos == DATE_VALUE_LENGTH);
+ self.render_http2();
+ }
+
+ #[cfg(feature = "http2")]
+ fn render_http2(&mut self) {
+ self.header_value = HeaderValue::from_bytes(self.buffer())
+ .expect("Date format should be valid HeaderValue");
+ }
+
+ #[cfg(not(feature = "http2"))]
+ fn render_http2(&mut self) {}
+}
+
+impl fmt::Write for CachedDate {
+ fn write_str(&mut self, s: &str) -> fmt::Result {
+ let len = s.len();
+ self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
+ self.pos += len;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[cfg(feature = "nightly")]
+ use test::Bencher;
+
+ #[test]
+ fn test_date_len() {
+ assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn bench_date_check(b: &mut Bencher) {
+ let mut date = CachedDate::new();
+ // cache the first update
+ date.check();
+
+ b.iter(|| {
+ date.check();
+ });
+ }
+
+ #[cfg(feature = "nightly")]
+ #[bench]
+ fn bench_date_render(b: &mut Bencher) {
+ let mut date = CachedDate::new();
+ let now = SystemTime::now();
+ date.render(now);
+ b.bytes = date.buffer().len() as u64;
+
+ b.iter(|| {
+ date.render(now);
+ test::black_box(&date);
+ });
+ }
+}
diff --git a/vendor/hyper/src/common/io/compat.rs b/vendor/hyper/src/common/io/compat.rs
new file mode 100644
index 00000000..d026b6d3
--- /dev/null
+++ b/vendor/hyper/src/common/io/compat.rs
@@ -0,0 +1,150 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// This adapts from `hyper` IO traits to the ones in Tokio.
+///
+/// This is currently used by `h2`, and by hyper internal unit tests.
+#[derive(Debug)]
+pub(crate) struct Compat<T>(pub(crate) T);
+
+impl<T> Compat<T> {
+ pub(crate) fn new(io: T) -> Self {
+ Compat(io)
+ }
+
+ fn p(self: Pin<&mut Self>) -> Pin<&mut T> {
+ // SAFETY: The simplest of projections. This is just
+ // a wrapper, we don't do anything that would undo the projection.
+ unsafe { self.map_unchecked_mut(|me| &mut me.0) }
+ }
+}
+
+impl<T> tokio::io::AsyncRead for Compat<T>
+where
+ T: crate::rt::Read,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ tbuf: &mut tokio::io::ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let init = tbuf.initialized().len();
+ let filled = tbuf.filled().len();
+ let (new_init, new_filled) = unsafe {
+ let mut buf = crate::rt::ReadBuf::uninit(tbuf.inner_mut());
+ buf.set_init(init);
+ buf.set_filled(filled);
+
+ match crate::rt::Read::poll_read(self.p(), cx, buf.unfilled()) {
+ Poll::Ready(Ok(())) => (buf.init_len(), buf.len()),
+ other => return other,
+ }
+ };
+
+ let n_init = new_init - init;
+ unsafe {
+ tbuf.assume_init(n_init);
+ tbuf.set_filled(new_filled);
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<T> tokio::io::AsyncWrite for Compat<T>
+where
+ T: crate::rt::Write,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ crate::rt::Write::poll_write(self.p(), cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
+ crate::rt::Write::poll_flush(self.p(), cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ crate::rt::Write::poll_shutdown(self.p(), cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ crate::rt::Write::is_write_vectored(&self.0)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ crate::rt::Write::poll_write_vectored(self.p(), cx, bufs)
+ }
+}
+
+#[cfg(test)]
+impl<T> crate::rt::Read for Compat<T>
+where
+ T: tokio::io::AsyncRead,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut buf: crate::rt::ReadBufCursor<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let n = unsafe {
+ let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
+ match tokio::io::AsyncRead::poll_read(self.p(), cx, &mut tbuf) {
+ Poll::Ready(Ok(())) => tbuf.filled().len(),
+ other => return other,
+ }
+ };
+
+ unsafe {
+ buf.advance(n);
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+#[cfg(test)]
+impl<T> crate::rt::Write for Compat<T>
+where
+ T: tokio::io::AsyncWrite,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ tokio::io::AsyncWrite::poll_write(self.p(), cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
+ tokio::io::AsyncWrite::poll_flush(self.p(), cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ tokio::io::AsyncWrite::poll_shutdown(self.p(), cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ tokio::io::AsyncWrite::is_write_vectored(&self.0)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ tokio::io::AsyncWrite::poll_write_vectored(self.p(), cx, bufs)
+ }
+}
diff --git a/vendor/hyper/src/common/io/mod.rs b/vendor/hyper/src/common/io/mod.rs
new file mode 100644
index 00000000..98c297ca
--- /dev/null
+++ b/vendor/hyper/src/common/io/mod.rs
@@ -0,0 +1,7 @@
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
+mod compat;
+mod rewind;
+
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
+pub(crate) use self::compat::Compat;
+pub(crate) use self::rewind::Rewind;
diff --git a/vendor/hyper/src/common/io/rewind.rs b/vendor/hyper/src/common/io/rewind.rs
new file mode 100644
index 00000000..c2556f01
--- /dev/null
+++ b/vendor/hyper/src/common/io/rewind.rs
@@ -0,0 +1,160 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::{cmp, io};
+
+use bytes::{Buf, Bytes};
+
+use crate::rt::{Read, ReadBufCursor, Write};
+
+/// Combine a buffer with an IO, rewinding reads to use the buffer.
+#[derive(Debug)]
+pub(crate) struct Rewind<T> {
+ pre: Option<Bytes>,
+ inner: T,
+}
+
+impl<T> Rewind<T> {
+ #[cfg(test)]
+ pub(crate) fn new(io: T) -> Self {
+ Rewind {
+ pre: None,
+ inner: io,
+ }
+ }
+
+ pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self {
+ Rewind {
+ pre: Some(buf),
+ inner: io,
+ }
+ }
+
+ #[cfg(test)]
+ pub(crate) fn rewind(&mut self, bs: Bytes) {
+ debug_assert!(self.pre.is_none());
+ self.pre = Some(bs);
+ }
+
+ pub(crate) fn into_inner(self) -> (T, Bytes) {
+ (self.inner, self.pre.unwrap_or_default())
+ }
+
+ // pub(crate) fn get_mut(&mut self) -> &mut T {
+ // &mut self.inner
+ // }
+}
+
+impl<T> Read for Rewind<T>
+where
+ T: Read + Unpin,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut buf: ReadBufCursor<'_>,
+ ) -> Poll<io::Result<()>> {
+ if let Some(mut prefix) = self.pre.take() {
+ // If there are no remaining bytes, let the bytes get dropped.
+ if !prefix.is_empty() {
+ let copy_len = cmp::min(prefix.len(), buf.remaining());
+ // TODO: There should be a way to do following two lines cleaner...
+ buf.put_slice(&prefix[..copy_len]);
+ prefix.advance(copy_len);
+ // Put back what's left
+ if !prefix.is_empty() {
+ self.pre = Some(prefix);
+ }
+
+ return Poll::Ready(Ok(()));
+ }
+ }
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+}
+
+impl<T> Write for Rewind<T>
+where
+ T: Write + Unpin,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.inner.is_write_vectored()
+ }
+}
+
+#[cfg(all(
+ any(feature = "client", feature = "server"),
+ any(feature = "http1", feature = "http2"),
+))]
+#[cfg(test)]
+mod tests {
+ use super::super::Compat;
+ use super::Rewind;
+ use bytes::Bytes;
+ use tokio::io::AsyncReadExt;
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn partial_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Compat::new(Rewind::new(Compat::new(mock)));
+
+ // Read off some bytes, ensure we filled o1
+ let mut buf = [0; 2];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.0.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // At this point we should have read everything that was in the MockStream
+ assert_eq!(&buf, &underlying);
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn full_rewind() {
+ let underlying = [104, 101, 108, 108, 111];
+
+ let mock = tokio_test::io::Builder::new().read(&underlying).build();
+
+ let mut stream = Compat::new(Rewind::new(Compat::new(mock)));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+
+ // Rewind the stream so that it is as if we never read in the first place.
+ stream.0.rewind(Bytes::copy_from_slice(&buf[..]));
+
+ let mut buf = [0; 5];
+ stream.read_exact(&mut buf).await.expect("read1");
+ }
+}
diff --git a/vendor/hyper/src/common/mod.rs b/vendor/hyper/src/common/mod.rs
new file mode 100644
index 00000000..a0c71385
--- /dev/null
+++ b/vendor/hyper/src/common/mod.rs
@@ -0,0 +1,14 @@
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
+pub(crate) mod buf;
+#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
+pub(crate) mod date;
+pub(crate) mod io;
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
+pub(crate) mod task;
+#[cfg(any(
+ all(feature = "server", feature = "http1"),
+ all(any(feature = "client", feature = "server"), feature = "http2"),
+))]
+pub(crate) mod time;
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
+pub(crate) mod watch;
diff --git a/vendor/hyper/src/common/task.rs b/vendor/hyper/src/common/task.rs
new file mode 100644
index 00000000..41671b14
--- /dev/null
+++ b/vendor/hyper/src/common/task.rs
@@ -0,0 +1,9 @@
+use std::task::{Context, Poll};
+
+/// A function to help "yield" a future, such that it is re-scheduled immediately.
+///
+/// Useful for spin counts, so a future doesn't hog too much time.
+pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<std::convert::Infallible> {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+}
diff --git a/vendor/hyper/src/common/time.rs b/vendor/hyper/src/common/time.rs
new file mode 100644
index 00000000..a8d3cc9c
--- /dev/null
+++ b/vendor/hyper/src/common/time.rs
@@ -0,0 +1,79 @@
+#[cfg(any(
+ all(any(feature = "client", feature = "server"), feature = "http2"),
+ all(feature = "server", feature = "http1"),
+))]
+use std::time::Duration;
+use std::{fmt, sync::Arc};
+use std::{pin::Pin, time::Instant};
+
+use crate::rt::Sleep;
+use crate::rt::Timer;
+
+/// A user-provided timer to time background tasks.
+#[derive(Clone)]
+pub(crate) enum Time {
+ Timer(Arc<dyn Timer + Send + Sync>),
+ Empty,
+}
+
+#[cfg(all(feature = "server", feature = "http1"))]
+#[derive(Clone, Copy, Debug)]
+pub(crate) enum Dur {
+ Default(Option<Duration>),
+ Configured(Option<Duration>),
+}
+
+impl fmt::Debug for Time {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Time").finish()
+ }
+}
+
+impl Time {
+ #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
+ pub(crate) fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
+ match *self {
+ Time::Empty => {
+ panic!("You must supply a timer.")
+ }
+ Time::Timer(ref t) => t.sleep(duration),
+ }
+ }
+
+ #[cfg(feature = "http1")]
+ pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
+ match *self {
+ Time::Empty => {
+ panic!("You must supply a timer.")
+ }
+ Time::Timer(ref t) => t.sleep_until(deadline),
+ }
+ }
+
+ pub(crate) fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
+ match *self {
+ Time::Empty => {
+ panic!("You must supply a timer.")
+ }
+ Time::Timer(ref t) => t.reset(sleep, new_deadline),
+ }
+ }
+
+ #[cfg(all(feature = "server", feature = "http1"))]
+ pub(crate) fn check(&self, dur: Dur, name: &'static str) -> Option<Duration> {
+ match dur {
+ Dur::Default(Some(dur)) => match self {
+ Time::Empty => {
+ warn!("timeout `{}` has default, but no timer set", name,);
+ None
+ }
+ Time::Timer(..) => Some(dur),
+ },
+ Dur::Configured(Some(dur)) => match self {
+ Time::Empty => panic!("timeout `{}` set, but no timer set", name,),
+ Time::Timer(..) => Some(dur),
+ },
+ Dur::Default(None) | Dur::Configured(None) => None,
+ }
+ }
+}
diff --git a/vendor/hyper/src/common/watch.rs b/vendor/hyper/src/common/watch.rs
new file mode 100644
index 00000000..ba17d551
--- /dev/null
+++ b/vendor/hyper/src/common/watch.rs
@@ -0,0 +1,73 @@
+//! An SPSC broadcast channel.
+//!
+//! - The value can only be a `usize`.
+//! - The consumer is only notified if the value is different.
+//! - The value `0` is reserved for closed.
+
+use futures_util::task::AtomicWaker;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+use std::task;
+
+type Value = usize;
+
+pub(crate) const CLOSED: usize = 0;
+
+pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
+ debug_assert!(
+ initial != CLOSED,
+ "watch::channel initial state of 0 is reserved"
+ );
+
+ let shared = Arc::new(Shared {
+ value: AtomicUsize::new(initial),
+ waker: AtomicWaker::new(),
+ });
+
+ (
+ Sender {
+ shared: shared.clone(),
+ },
+ Receiver { shared },
+ )
+}
+
+pub(crate) struct Sender {
+ shared: Arc<Shared>,
+}
+
+pub(crate) struct Receiver {
+ shared: Arc<Shared>,
+}
+
+struct Shared {
+ value: AtomicUsize,
+ waker: AtomicWaker,
+}
+
+impl Sender {
+ pub(crate) fn send(&mut self, value: Value) {
+ if self.shared.value.swap(value, Ordering::SeqCst) != value {
+ self.shared.waker.wake();
+ }
+ }
+}
+
+impl Drop for Sender {
+ fn drop(&mut self) {
+ self.send(CLOSED);
+ }
+}
+
+impl Receiver {
+ pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
+ self.shared.waker.register(cx.waker());
+ self.shared.value.load(Ordering::SeqCst)
+ }
+
+ pub(crate) fn peek(&self) -> Value {
+ self.shared.value.load(Ordering::Relaxed)
+ }
+}