diff options
Diffstat (limited to 'vendor/hyper/src/common')
| -rw-r--r-- | vendor/hyper/src/common/buf.rs | 150 | ||||
| -rw-r--r-- | vendor/hyper/src/common/date.rs | 138 | ||||
| -rw-r--r-- | vendor/hyper/src/common/io/compat.rs | 150 | ||||
| -rw-r--r-- | vendor/hyper/src/common/io/mod.rs | 7 | ||||
| -rw-r--r-- | vendor/hyper/src/common/io/rewind.rs | 160 | ||||
| -rw-r--r-- | vendor/hyper/src/common/mod.rs | 14 | ||||
| -rw-r--r-- | vendor/hyper/src/common/task.rs | 9 | ||||
| -rw-r--r-- | vendor/hyper/src/common/time.rs | 79 | ||||
| -rw-r--r-- | vendor/hyper/src/common/watch.rs | 73 |
9 files changed, 780 insertions, 0 deletions
diff --git a/vendor/hyper/src/common/buf.rs b/vendor/hyper/src/common/buf.rs new file mode 100644 index 00000000..d0007155 --- /dev/null +++ b/vendor/hyper/src/common/buf.rs @@ -0,0 +1,150 @@ +use std::collections::VecDeque; +use std::io::IoSlice; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +pub(crate) struct BufList<T> { + bufs: VecDeque<T>, +} + +impl<T: Buf> BufList<T> { + pub(crate) fn new() -> BufList<T> { + BufList { + bufs: VecDeque::new(), + } + } + + #[inline] + pub(crate) fn push(&mut self, buf: T) { + debug_assert!(buf.has_remaining()); + self.bufs.push_back(buf); + } + + #[inline] + pub(crate) fn bufs_cnt(&self) -> usize { + self.bufs.len() + } +} + +impl<T: Buf> Buf for BufList<T> { + #[inline] + fn remaining(&self) -> usize { + self.bufs.iter().map(|buf| buf.remaining()).sum() + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.bufs.front().map(Buf::chunk).unwrap_or_default() + } + + #[inline] + fn advance(&mut self, mut cnt: usize) { + while cnt > 0 { + { + let front = &mut self.bufs[0]; + let rem = front.remaining(); + if rem > cnt { + front.advance(cnt); + return; + } else { + front.advance(rem); + cnt -= rem; + } + } + self.bufs.pop_front(); + } + } + + #[inline] + fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { + if dst.is_empty() { + return 0; + } + let mut vecs = 0; + for buf in &self.bufs { + vecs += buf.chunks_vectored(&mut dst[vecs..]); + if vecs == dst.len() { + break; + } + } + vecs + } + + #[inline] + fn copy_to_bytes(&mut self, len: usize) -> Bytes { + // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole + // request can be fulfilled by the front buffer, we can take advantage. + match self.bufs.front_mut() { + Some(front) if front.remaining() == len => { + let b = front.copy_to_bytes(len); + self.bufs.pop_front(); + b + } + Some(front) if front.remaining() > len => front.copy_to_bytes(len), + _ => { + assert!(len <= self.remaining(), "`len` greater than remaining"); + let mut bm = BytesMut::with_capacity(len); + bm.put(self.take(len)); + bm.freeze() + } + } + } +} + +#[cfg(test)] +mod tests { + use std::ptr; + + use super::*; + + fn hello_world_buf() -> BufList<Bytes> { + BufList { + bufs: vec![Bytes::from("Hello"), Bytes::from(" "), Bytes::from("World")].into(), + } + } + + #[test] + fn to_bytes_shorter() { + let mut bufs = hello_world_buf(); + let old_ptr = bufs.chunk().as_ptr(); + let start = bufs.copy_to_bytes(4); + assert_eq!(start, "Hell"); + assert!(ptr::eq(old_ptr, start.as_ptr())); + assert_eq!(bufs.chunk(), b"o"); + assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr())); + assert_eq!(bufs.remaining(), 7); + } + + #[test] + fn to_bytes_eq() { + let mut bufs = hello_world_buf(); + let old_ptr = bufs.chunk().as_ptr(); + let start = bufs.copy_to_bytes(5); + assert_eq!(start, "Hello"); + assert!(ptr::eq(old_ptr, start.as_ptr())); + assert_eq!(bufs.chunk(), b" "); + assert_eq!(bufs.remaining(), 6); + } + + #[test] + fn to_bytes_longer() { + let mut bufs = hello_world_buf(); + let start = bufs.copy_to_bytes(7); + assert_eq!(start, "Hello W"); + assert_eq!(bufs.remaining(), 4); + } + + #[test] + fn one_long_buf_to_bytes() { + let mut buf = BufList::new(); + buf.push(b"Hello World" as &[_]); + assert_eq!(buf.copy_to_bytes(5), "Hello"); + assert_eq!(buf.chunk(), b" World"); + } + + #[test] + #[should_panic(expected = "`len` greater than remaining")] + fn buf_to_bytes_too_many() { + hello_world_buf().copy_to_bytes(42); + } +} diff --git a/vendor/hyper/src/common/date.rs b/vendor/hyper/src/common/date.rs new file mode 100644 index 00000000..6eae6746 --- /dev/null +++ b/vendor/hyper/src/common/date.rs @@ -0,0 +1,138 @@ +use std::cell::RefCell; +use std::fmt::{self, Write}; +use std::str; +use std::time::{Duration, SystemTime}; + +#[cfg(feature = "http2")] +use http::header::HeaderValue; +use httpdate::HttpDate; + +// "Sun, 06 Nov 1994 08:49:37 GMT".len() +pub(crate) const DATE_VALUE_LENGTH: usize = 29; + +#[cfg(feature = "http1")] +pub(crate) fn extend(dst: &mut Vec<u8>) { + CACHED.with(|cache| { + dst.extend_from_slice(cache.borrow().buffer()); + }) +} + +#[cfg(feature = "http1")] +pub(crate) fn update() { + CACHED.with(|cache| { + cache.borrow_mut().check(); + }) +} + +#[cfg(feature = "http2")] +pub(crate) fn update_and_header_value() -> HeaderValue { + CACHED.with(|cache| { + let mut cache = cache.borrow_mut(); + cache.check(); + cache.header_value.clone() + }) +} + +struct CachedDate { + bytes: [u8; DATE_VALUE_LENGTH], + pos: usize, + #[cfg(feature = "http2")] + header_value: HeaderValue, + next_update: SystemTime, +} + +thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate::new())); + +impl CachedDate { + fn new() -> Self { + let mut cache = CachedDate { + bytes: [0; DATE_VALUE_LENGTH], + pos: 0, + #[cfg(feature = "http2")] + header_value: HeaderValue::from_static(""), + next_update: SystemTime::now(), + }; + cache.update(cache.next_update); + cache + } + + fn buffer(&self) -> &[u8] { + &self.bytes[..] + } + + fn check(&mut self) { + let now = SystemTime::now(); + if now > self.next_update { + self.update(now); + } + } + + fn update(&mut self, now: SystemTime) { + self.render(now); + self.next_update = now + Duration::new(1, 0); + } + + fn render(&mut self, now: SystemTime) { + self.pos = 0; + let _ = write!(self, "{}", HttpDate::from(now)); + debug_assert!(self.pos == DATE_VALUE_LENGTH); + self.render_http2(); + } + + #[cfg(feature = "http2")] + fn render_http2(&mut self) { + self.header_value = HeaderValue::from_bytes(self.buffer()) + .expect("Date format should be valid HeaderValue"); + } + + #[cfg(not(feature = "http2"))] + fn render_http2(&mut self) {} +} + +impl fmt::Write for CachedDate { + fn write_str(&mut self, s: &str) -> fmt::Result { + let len = s.len(); + self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); + self.pos += len; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "nightly")] + use test::Bencher; + + #[test] + fn test_date_len() { + assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len()); + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_date_check(b: &mut Bencher) { + let mut date = CachedDate::new(); + // cache the first update + date.check(); + + b.iter(|| { + date.check(); + }); + } + + #[cfg(feature = "nightly")] + #[bench] + fn bench_date_render(b: &mut Bencher) { + let mut date = CachedDate::new(); + let now = SystemTime::now(); + date.render(now); + b.bytes = date.buffer().len() as u64; + + b.iter(|| { + date.render(now); + test::black_box(&date); + }); + } +} diff --git a/vendor/hyper/src/common/io/compat.rs b/vendor/hyper/src/common/io/compat.rs new file mode 100644 index 00000000..d026b6d3 --- /dev/null +++ b/vendor/hyper/src/common/io/compat.rs @@ -0,0 +1,150 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// This adapts from `hyper` IO traits to the ones in Tokio. +/// +/// This is currently used by `h2`, and by hyper internal unit tests. +#[derive(Debug)] +pub(crate) struct Compat<T>(pub(crate) T); + +impl<T> Compat<T> { + pub(crate) fn new(io: T) -> Self { + Compat(io) + } + + fn p(self: Pin<&mut Self>) -> Pin<&mut T> { + // SAFETY: The simplest of projections. This is just + // a wrapper, we don't do anything that would undo the projection. + unsafe { self.map_unchecked_mut(|me| &mut me.0) } + } +} + +impl<T> tokio::io::AsyncRead for Compat<T> +where + T: crate::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll<Result<(), std::io::Error>> { + let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let (new_init, new_filled) = unsafe { + let mut buf = crate::rt::ReadBuf::uninit(tbuf.inner_mut()); + buf.set_init(init); + buf.set_filled(filled); + + match crate::rt::Read::poll_read(self.p(), cx, buf.unfilled()) { + Poll::Ready(Ok(())) => (buf.init_len(), buf.len()), + other => return other, + } + }; + + let n_init = new_init - init; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(new_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl<T> tokio::io::AsyncWrite for Compat<T> +where + T: crate::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + crate::rt::Write::poll_write(self.p(), cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + crate::rt::Write::poll_flush(self.p(), cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + crate::rt::Write::poll_shutdown(self.p(), cx) + } + + fn is_write_vectored(&self) -> bool { + crate::rt::Write::is_write_vectored(&self.0) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + crate::rt::Write::poll_write_vectored(self.p(), cx, bufs) + } +} + +#[cfg(test)] +impl<T> crate::rt::Read for Compat<T> +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: crate::rt::ReadBufCursor<'_>, + ) -> Poll<Result<(), std::io::Error>> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.p(), cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +impl<T> crate::rt::Write for Compat<T> +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize, std::io::Error>> { + tokio::io::AsyncWrite::poll_write(self.p(), cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { + tokio::io::AsyncWrite::poll_flush(self.p(), cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), std::io::Error>> { + tokio::io::AsyncWrite::poll_shutdown(self.p(), cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.0) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll<Result<usize, std::io::Error>> { + tokio::io::AsyncWrite::poll_write_vectored(self.p(), cx, bufs) + } +} diff --git a/vendor/hyper/src/common/io/mod.rs b/vendor/hyper/src/common/io/mod.rs new file mode 100644 index 00000000..98c297ca --- /dev/null +++ b/vendor/hyper/src/common/io/mod.rs @@ -0,0 +1,7 @@ +#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] +mod compat; +mod rewind; + +#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] +pub(crate) use self::compat::Compat; +pub(crate) use self::rewind::Rewind; diff --git a/vendor/hyper/src/common/io/rewind.rs b/vendor/hyper/src/common/io/rewind.rs new file mode 100644 index 00000000..c2556f01 --- /dev/null +++ b/vendor/hyper/src/common/io/rewind.rs @@ -0,0 +1,160 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{cmp, io}; + +use bytes::{Buf, Bytes}; + +use crate::rt::{Read, ReadBufCursor, Write}; + +/// Combine a buffer with an IO, rewinding reads to use the buffer. +#[derive(Debug)] +pub(crate) struct Rewind<T> { + pre: Option<Bytes>, + inner: T, +} + +impl<T> Rewind<T> { + #[cfg(test)] + pub(crate) fn new(io: T) -> Self { + Rewind { + pre: None, + inner: io, + } + } + + pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { + Rewind { + pre: Some(buf), + inner: io, + } + } + + #[cfg(test)] + pub(crate) fn rewind(&mut self, bs: Bytes) { + debug_assert!(self.pre.is_none()); + self.pre = Some(bs); + } + + pub(crate) fn into_inner(self) -> (T, Bytes) { + (self.inner, self.pre.unwrap_or_default()) + } + + // pub(crate) fn get_mut(&mut self) -> &mut T { + // &mut self.inner + // } +} + +impl<T> Read for Rewind<T> +where + T: Read + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: ReadBufCursor<'_>, + ) -> Poll<io::Result<()>> { + if let Some(mut prefix) = self.pre.take() { + // If there are no remaining bytes, let the bytes get dropped. + if !prefix.is_empty() { + let copy_len = cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&prefix[..copy_len]); + prefix.advance(copy_len); + // Put back what's left + if !prefix.is_empty() { + self.pre = Some(prefix); + } + + return Poll::Ready(Ok(())); + } + } + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl<T> Write for Rewind<T> +where + T: Write + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +#[cfg(all( + any(feature = "client", feature = "server"), + any(feature = "http1", feature = "http2"), +))] +#[cfg(test)] +mod tests { + use super::super::Compat; + use super::Rewind; + use bytes::Bytes; + use tokio::io::AsyncReadExt; + + #[cfg(not(miri))] + #[tokio::test] + async fn partial_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Compat::new(Rewind::new(Compat::new(mock))); + + // Read off some bytes, ensure we filled o1 + let mut buf = [0; 2]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.0.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // At this point we should have read everything that was in the MockStream + assert_eq!(&buf, &underlying); + } + + #[cfg(not(miri))] + #[tokio::test] + async fn full_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Compat::new(Rewind::new(Compat::new(mock))); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.0.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + } +} diff --git a/vendor/hyper/src/common/mod.rs b/vendor/hyper/src/common/mod.rs new file mode 100644 index 00000000..a0c71385 --- /dev/null +++ b/vendor/hyper/src/common/mod.rs @@ -0,0 +1,14 @@ +#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] +pub(crate) mod buf; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +pub(crate) mod date; +pub(crate) mod io; +#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] +pub(crate) mod task; +#[cfg(any( + all(feature = "server", feature = "http1"), + all(any(feature = "client", feature = "server"), feature = "http2"), +))] +pub(crate) mod time; +#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] +pub(crate) mod watch; diff --git a/vendor/hyper/src/common/task.rs b/vendor/hyper/src/common/task.rs new file mode 100644 index 00000000..41671b14 --- /dev/null +++ b/vendor/hyper/src/common/task.rs @@ -0,0 +1,9 @@ +use std::task::{Context, Poll}; + +/// A function to help "yield" a future, such that it is re-scheduled immediately. +/// +/// Useful for spin counts, so a future doesn't hog too much time. +pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll<std::convert::Infallible> { + cx.waker().wake_by_ref(); + Poll::Pending +} diff --git a/vendor/hyper/src/common/time.rs b/vendor/hyper/src/common/time.rs new file mode 100644 index 00000000..a8d3cc9c --- /dev/null +++ b/vendor/hyper/src/common/time.rs @@ -0,0 +1,79 @@ +#[cfg(any( + all(any(feature = "client", feature = "server"), feature = "http2"), + all(feature = "server", feature = "http1"), +))] +use std::time::Duration; +use std::{fmt, sync::Arc}; +use std::{pin::Pin, time::Instant}; + +use crate::rt::Sleep; +use crate::rt::Timer; + +/// A user-provided timer to time background tasks. +#[derive(Clone)] +pub(crate) enum Time { + Timer(Arc<dyn Timer + Send + Sync>), + Empty, +} + +#[cfg(all(feature = "server", feature = "http1"))] +#[derive(Clone, Copy, Debug)] +pub(crate) enum Dur { + Default(Option<Duration>), + Configured(Option<Duration>), +} + +impl fmt::Debug for Time { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Time").finish() + } +} + +impl Time { + #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] + pub(crate) fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> { + match *self { + Time::Empty => { + panic!("You must supply a timer.") + } + Time::Timer(ref t) => t.sleep(duration), + } + } + + #[cfg(feature = "http1")] + pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> { + match *self { + Time::Empty => { + panic!("You must supply a timer.") + } + Time::Timer(ref t) => t.sleep_until(deadline), + } + } + + pub(crate) fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) { + match *self { + Time::Empty => { + panic!("You must supply a timer.") + } + Time::Timer(ref t) => t.reset(sleep, new_deadline), + } + } + + #[cfg(all(feature = "server", feature = "http1"))] + pub(crate) fn check(&self, dur: Dur, name: &'static str) -> Option<Duration> { + match dur { + Dur::Default(Some(dur)) => match self { + Time::Empty => { + warn!("timeout `{}` has default, but no timer set", name,); + None + } + Time::Timer(..) => Some(dur), + }, + Dur::Configured(Some(dur)) => match self { + Time::Empty => panic!("timeout `{}` set, but no timer set", name,), + Time::Timer(..) => Some(dur), + }, + Dur::Default(None) | Dur::Configured(None) => None, + } + } +} diff --git a/vendor/hyper/src/common/watch.rs b/vendor/hyper/src/common/watch.rs new file mode 100644 index 00000000..ba17d551 --- /dev/null +++ b/vendor/hyper/src/common/watch.rs @@ -0,0 +1,73 @@ +//! An SPSC broadcast channel. +//! +//! - The value can only be a `usize`. +//! - The consumer is only notified if the value is different. +//! - The value `0` is reserved for closed. + +use futures_util::task::AtomicWaker; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::task; + +type Value = usize; + +pub(crate) const CLOSED: usize = 0; + +pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { + debug_assert!( + initial != CLOSED, + "watch::channel initial state of 0 is reserved" + ); + + let shared = Arc::new(Shared { + value: AtomicUsize::new(initial), + waker: AtomicWaker::new(), + }); + + ( + Sender { + shared: shared.clone(), + }, + Receiver { shared }, + ) +} + +pub(crate) struct Sender { + shared: Arc<Shared>, +} + +pub(crate) struct Receiver { + shared: Arc<Shared>, +} + +struct Shared { + value: AtomicUsize, + waker: AtomicWaker, +} + +impl Sender { + pub(crate) fn send(&mut self, value: Value) { + if self.shared.value.swap(value, Ordering::SeqCst) != value { + self.shared.waker.wake(); + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.send(CLOSED); + } +} + +impl Receiver { + pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { + self.shared.waker.register(cx.waker()); + self.shared.value.load(Ordering::SeqCst) + } + + pub(crate) fn peek(&self) -> Value { + self.shared.value.load(Ordering::Relaxed) + } +} |
