From 45df4d0d9b577fecee798d672695fe24ff57fb1b Mon Sep 17 00:00:00 2001 From: mo khan Date: Tue, 15 Jul 2025 16:37:08 -0600 Subject: feat: migrate from Cedar to SpiceDB authorization system This is a major architectural change that replaces the Cedar policy-based authorization system with SpiceDB's relation-based authorization. Key changes: - Migrate from Rust to Go implementation - Replace Cedar policies with SpiceDB schema and relationships - Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks - Update build system and dependencies for Go ecosystem - Maintain Envoy integration for external authorization This change enables more flexible permission modeling through SpiceDB's Google Zanzibar inspired relation-based system, supporting complex hierarchical permissions that were difficult to express in Cedar. Breaking change: Existing Cedar policies and Rust-based configuration will no longer work and need to be migrated to SpiceDB schema. --- vendor/hyper/src/common/buf.rs | 150 -------------------------------- vendor/hyper/src/common/date.rs | 138 ------------------------------ vendor/hyper/src/common/io/compat.rs | 150 -------------------------------- vendor/hyper/src/common/io/mod.rs | 7 -- vendor/hyper/src/common/io/rewind.rs | 160 ----------------------------------- vendor/hyper/src/common/mod.rs | 14 --- vendor/hyper/src/common/task.rs | 9 -- vendor/hyper/src/common/time.rs | 79 ----------------- vendor/hyper/src/common/watch.rs | 73 ---------------- 9 files changed, 780 deletions(-) delete mode 100644 vendor/hyper/src/common/buf.rs delete mode 100644 vendor/hyper/src/common/date.rs delete mode 100644 vendor/hyper/src/common/io/compat.rs delete mode 100644 vendor/hyper/src/common/io/mod.rs delete mode 100644 vendor/hyper/src/common/io/rewind.rs delete mode 100644 vendor/hyper/src/common/mod.rs delete mode 100644 vendor/hyper/src/common/task.rs delete mode 100644 vendor/hyper/src/common/time.rs delete mode 100644 vendor/hyper/src/common/watch.rs (limited to 'vendor/hyper/src/common') diff --git a/vendor/hyper/src/common/buf.rs b/vendor/hyper/src/common/buf.rs deleted file mode 100644 index d0007155..00000000 --- a/vendor/hyper/src/common/buf.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::collections::VecDeque; -use std::io::IoSlice; - -use bytes::{Buf, BufMut, Bytes, BytesMut}; - -pub(crate) struct BufList { - bufs: VecDeque, -} - -impl BufList { - pub(crate) fn new() -> BufList { - BufList { - bufs: VecDeque::new(), - } - } - - #[inline] - pub(crate) fn push(&mut self, buf: T) { - debug_assert!(buf.has_remaining()); - self.bufs.push_back(buf); - } - - #[inline] - pub(crate) fn bufs_cnt(&self) -> usize { - self.bufs.len() - } -} - -impl Buf for BufList { - #[inline] - fn remaining(&self) -> usize { - self.bufs.iter().map(|buf| buf.remaining()).sum() - } - - #[inline] - fn chunk(&self) -> &[u8] { - self.bufs.front().map(Buf::chunk).unwrap_or_default() - } - - #[inline] - fn advance(&mut self, mut cnt: usize) { - while cnt > 0 { - { - let front = &mut self.bufs[0]; - let rem = front.remaining(); - if rem > cnt { - front.advance(cnt); - return; - } else { - front.advance(rem); - cnt -= rem; - } - } - self.bufs.pop_front(); - } - } - - #[inline] - fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { - if dst.is_empty() { - return 0; - } - let mut vecs = 0; - for buf in &self.bufs { - vecs += buf.chunks_vectored(&mut dst[vecs..]); - if vecs == dst.len() { - break; - } - } - vecs - } - - #[inline] - fn copy_to_bytes(&mut self, len: usize) -> Bytes { - // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole - // request can be fulfilled by the front buffer, we can take advantage. - match self.bufs.front_mut() { - Some(front) if front.remaining() == len => { - let b = front.copy_to_bytes(len); - self.bufs.pop_front(); - b - } - Some(front) if front.remaining() > len => front.copy_to_bytes(len), - _ => { - assert!(len <= self.remaining(), "`len` greater than remaining"); - let mut bm = BytesMut::with_capacity(len); - bm.put(self.take(len)); - bm.freeze() - } - } - } -} - -#[cfg(test)] -mod tests { - use std::ptr; - - use super::*; - - fn hello_world_buf() -> BufList { - BufList { - bufs: vec![Bytes::from("Hello"), Bytes::from(" "), Bytes::from("World")].into(), - } - } - - #[test] - fn to_bytes_shorter() { - let mut bufs = hello_world_buf(); - let old_ptr = bufs.chunk().as_ptr(); - let start = bufs.copy_to_bytes(4); - assert_eq!(start, "Hell"); - assert!(ptr::eq(old_ptr, start.as_ptr())); - assert_eq!(bufs.chunk(), b"o"); - assert!(ptr::eq(old_ptr.wrapping_add(4), bufs.chunk().as_ptr())); - assert_eq!(bufs.remaining(), 7); - } - - #[test] - fn to_bytes_eq() { - let mut bufs = hello_world_buf(); - let old_ptr = bufs.chunk().as_ptr(); - let start = bufs.copy_to_bytes(5); - assert_eq!(start, "Hello"); - assert!(ptr::eq(old_ptr, start.as_ptr())); - assert_eq!(bufs.chunk(), b" "); - assert_eq!(bufs.remaining(), 6); - } - - #[test] - fn to_bytes_longer() { - let mut bufs = hello_world_buf(); - let start = bufs.copy_to_bytes(7); - assert_eq!(start, "Hello W"); - assert_eq!(bufs.remaining(), 4); - } - - #[test] - fn one_long_buf_to_bytes() { - let mut buf = BufList::new(); - buf.push(b"Hello World" as &[_]); - assert_eq!(buf.copy_to_bytes(5), "Hello"); - assert_eq!(buf.chunk(), b" World"); - } - - #[test] - #[should_panic(expected = "`len` greater than remaining")] - fn buf_to_bytes_too_many() { - hello_world_buf().copy_to_bytes(42); - } -} diff --git a/vendor/hyper/src/common/date.rs b/vendor/hyper/src/common/date.rs deleted file mode 100644 index 6eae6746..00000000 --- a/vendor/hyper/src/common/date.rs +++ /dev/null @@ -1,138 +0,0 @@ -use std::cell::RefCell; -use std::fmt::{self, Write}; -use std::str; -use std::time::{Duration, SystemTime}; - -#[cfg(feature = "http2")] -use http::header::HeaderValue; -use httpdate::HttpDate; - -// "Sun, 06 Nov 1994 08:49:37 GMT".len() -pub(crate) const DATE_VALUE_LENGTH: usize = 29; - -#[cfg(feature = "http1")] -pub(crate) fn extend(dst: &mut Vec) { - CACHED.with(|cache| { - dst.extend_from_slice(cache.borrow().buffer()); - }) -} - -#[cfg(feature = "http1")] -pub(crate) fn update() { - CACHED.with(|cache| { - cache.borrow_mut().check(); - }) -} - -#[cfg(feature = "http2")] -pub(crate) fn update_and_header_value() -> HeaderValue { - CACHED.with(|cache| { - let mut cache = cache.borrow_mut(); - cache.check(); - cache.header_value.clone() - }) -} - -struct CachedDate { - bytes: [u8; DATE_VALUE_LENGTH], - pos: usize, - #[cfg(feature = "http2")] - header_value: HeaderValue, - next_update: SystemTime, -} - -thread_local!(static CACHED: RefCell = RefCell::new(CachedDate::new())); - -impl CachedDate { - fn new() -> Self { - let mut cache = CachedDate { - bytes: [0; DATE_VALUE_LENGTH], - pos: 0, - #[cfg(feature = "http2")] - header_value: HeaderValue::from_static(""), - next_update: SystemTime::now(), - }; - cache.update(cache.next_update); - cache - } - - fn buffer(&self) -> &[u8] { - &self.bytes[..] - } - - fn check(&mut self) { - let now = SystemTime::now(); - if now > self.next_update { - self.update(now); - } - } - - fn update(&mut self, now: SystemTime) { - self.render(now); - self.next_update = now + Duration::new(1, 0); - } - - fn render(&mut self, now: SystemTime) { - self.pos = 0; - let _ = write!(self, "{}", HttpDate::from(now)); - debug_assert!(self.pos == DATE_VALUE_LENGTH); - self.render_http2(); - } - - #[cfg(feature = "http2")] - fn render_http2(&mut self) { - self.header_value = HeaderValue::from_bytes(self.buffer()) - .expect("Date format should be valid HeaderValue"); - } - - #[cfg(not(feature = "http2"))] - fn render_http2(&mut self) {} -} - -impl fmt::Write for CachedDate { - fn write_str(&mut self, s: &str) -> fmt::Result { - let len = s.len(); - self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); - self.pos += len; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[cfg(feature = "nightly")] - use test::Bencher; - - #[test] - fn test_date_len() { - assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len()); - } - - #[cfg(feature = "nightly")] - #[bench] - fn bench_date_check(b: &mut Bencher) { - let mut date = CachedDate::new(); - // cache the first update - date.check(); - - b.iter(|| { - date.check(); - }); - } - - #[cfg(feature = "nightly")] - #[bench] - fn bench_date_render(b: &mut Bencher) { - let mut date = CachedDate::new(); - let now = SystemTime::now(); - date.render(now); - b.bytes = date.buffer().len() as u64; - - b.iter(|| { - date.render(now); - test::black_box(&date); - }); - } -} diff --git a/vendor/hyper/src/common/io/compat.rs b/vendor/hyper/src/common/io/compat.rs deleted file mode 100644 index d026b6d3..00000000 --- a/vendor/hyper/src/common/io/compat.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// This adapts from `hyper` IO traits to the ones in Tokio. -/// -/// This is currently used by `h2`, and by hyper internal unit tests. -#[derive(Debug)] -pub(crate) struct Compat(pub(crate) T); - -impl Compat { - pub(crate) fn new(io: T) -> Self { - Compat(io) - } - - fn p(self: Pin<&mut Self>) -> Pin<&mut T> { - // SAFETY: The simplest of projections. This is just - // a wrapper, we don't do anything that would undo the projection. - unsafe { self.map_unchecked_mut(|me| &mut me.0) } - } -} - -impl tokio::io::AsyncRead for Compat -where - T: crate::rt::Read, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - tbuf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let init = tbuf.initialized().len(); - let filled = tbuf.filled().len(); - let (new_init, new_filled) = unsafe { - let mut buf = crate::rt::ReadBuf::uninit(tbuf.inner_mut()); - buf.set_init(init); - buf.set_filled(filled); - - match crate::rt::Read::poll_read(self.p(), cx, buf.unfilled()) { - Poll::Ready(Ok(())) => (buf.init_len(), buf.len()), - other => return other, - } - }; - - let n_init = new_init - init; - unsafe { - tbuf.assume_init(n_init); - tbuf.set_filled(new_filled); - } - - Poll::Ready(Ok(())) - } -} - -impl tokio::io::AsyncWrite for Compat -where - T: crate::rt::Write, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - crate::rt::Write::poll_write(self.p(), cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - crate::rt::Write::poll_flush(self.p(), cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - crate::rt::Write::poll_shutdown(self.p(), cx) - } - - fn is_write_vectored(&self) -> bool { - crate::rt::Write::is_write_vectored(&self.0) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - crate::rt::Write::poll_write_vectored(self.p(), cx, bufs) - } -} - -#[cfg(test)] -impl crate::rt::Read for Compat -where - T: tokio::io::AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut buf: crate::rt::ReadBufCursor<'_>, - ) -> Poll> { - let n = unsafe { - let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); - match tokio::io::AsyncRead::poll_read(self.p(), cx, &mut tbuf) { - Poll::Ready(Ok(())) => tbuf.filled().len(), - other => return other, - } - }; - - unsafe { - buf.advance(n); - } - Poll::Ready(Ok(())) - } -} - -#[cfg(test)] -impl crate::rt::Write for Compat -where - T: tokio::io::AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write(self.p(), cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tokio::io::AsyncWrite::poll_flush(self.p(), cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(self.p(), cx) - } - - fn is_write_vectored(&self) -> bool { - tokio::io::AsyncWrite::is_write_vectored(&self.0) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write_vectored(self.p(), cx, bufs) - } -} diff --git a/vendor/hyper/src/common/io/mod.rs b/vendor/hyper/src/common/io/mod.rs deleted file mode 100644 index 98c297ca..00000000 --- a/vendor/hyper/src/common/io/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] -mod compat; -mod rewind; - -#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] -pub(crate) use self::compat::Compat; -pub(crate) use self::rewind::Rewind; diff --git a/vendor/hyper/src/common/io/rewind.rs b/vendor/hyper/src/common/io/rewind.rs deleted file mode 100644 index c2556f01..00000000 --- a/vendor/hyper/src/common/io/rewind.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{cmp, io}; - -use bytes::{Buf, Bytes}; - -use crate::rt::{Read, ReadBufCursor, Write}; - -/// Combine a buffer with an IO, rewinding reads to use the buffer. -#[derive(Debug)] -pub(crate) struct Rewind { - pre: Option, - inner: T, -} - -impl Rewind { - #[cfg(test)] - pub(crate) fn new(io: T) -> Self { - Rewind { - pre: None, - inner: io, - } - } - - pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { - Rewind { - pre: Some(buf), - inner: io, - } - } - - #[cfg(test)] - pub(crate) fn rewind(&mut self, bs: Bytes) { - debug_assert!(self.pre.is_none()); - self.pre = Some(bs); - } - - pub(crate) fn into_inner(self) -> (T, Bytes) { - (self.inner, self.pre.unwrap_or_default()) - } - - // pub(crate) fn get_mut(&mut self) -> &mut T { - // &mut self.inner - // } -} - -impl Read for Rewind -where - T: Read + Unpin, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut buf: ReadBufCursor<'_>, - ) -> Poll> { - if let Some(mut prefix) = self.pre.take() { - // If there are no remaining bytes, let the bytes get dropped. - if !prefix.is_empty() { - let copy_len = cmp::min(prefix.len(), buf.remaining()); - // TODO: There should be a way to do following two lines cleaner... - buf.put_slice(&prefix[..copy_len]); - prefix.advance(copy_len); - // Put back what's left - if !prefix.is_empty() { - self.pre = Some(prefix); - } - - return Poll::Ready(Ok(())); - } - } - Pin::new(&mut self.inner).poll_read(cx, buf) - } -} - -impl Write for Rewind -where - T: Write + Unpin, -{ - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - fn poll_write_vectored( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[io::IoSlice<'_>], - ) -> Poll> { - Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_shutdown(cx) - } - - fn is_write_vectored(&self) -> bool { - self.inner.is_write_vectored() - } -} - -#[cfg(all( - any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2"), -))] -#[cfg(test)] -mod tests { - use super::super::Compat; - use super::Rewind; - use bytes::Bytes; - use tokio::io::AsyncReadExt; - - #[cfg(not(miri))] - #[tokio::test] - async fn partial_rewind() { - let underlying = [104, 101, 108, 108, 111]; - - let mock = tokio_test::io::Builder::new().read(&underlying).build(); - - let mut stream = Compat::new(Rewind::new(Compat::new(mock))); - - // Read off some bytes, ensure we filled o1 - let mut buf = [0; 2]; - stream.read_exact(&mut buf).await.expect("read1"); - - // Rewind the stream so that it is as if we never read in the first place. - stream.0.rewind(Bytes::copy_from_slice(&buf[..])); - - let mut buf = [0; 5]; - stream.read_exact(&mut buf).await.expect("read1"); - - // At this point we should have read everything that was in the MockStream - assert_eq!(&buf, &underlying); - } - - #[cfg(not(miri))] - #[tokio::test] - async fn full_rewind() { - let underlying = [104, 101, 108, 108, 111]; - - let mock = tokio_test::io::Builder::new().read(&underlying).build(); - - let mut stream = Compat::new(Rewind::new(Compat::new(mock))); - - let mut buf = [0; 5]; - stream.read_exact(&mut buf).await.expect("read1"); - - // Rewind the stream so that it is as if we never read in the first place. - stream.0.rewind(Bytes::copy_from_slice(&buf[..])); - - let mut buf = [0; 5]; - stream.read_exact(&mut buf).await.expect("read1"); - } -} diff --git a/vendor/hyper/src/common/mod.rs b/vendor/hyper/src/common/mod.rs deleted file mode 100644 index a0c71385..00000000 --- a/vendor/hyper/src/common/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] -pub(crate) mod buf; -#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -pub(crate) mod date; -pub(crate) mod io; -#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] -pub(crate) mod task; -#[cfg(any( - all(feature = "server", feature = "http1"), - all(any(feature = "client", feature = "server"), feature = "http2"), -))] -pub(crate) mod time; -#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] -pub(crate) mod watch; diff --git a/vendor/hyper/src/common/task.rs b/vendor/hyper/src/common/task.rs deleted file mode 100644 index 41671b14..00000000 --- a/vendor/hyper/src/common/task.rs +++ /dev/null @@ -1,9 +0,0 @@ -use std::task::{Context, Poll}; - -/// A function to help "yield" a future, such that it is re-scheduled immediately. -/// -/// Useful for spin counts, so a future doesn't hog too much time. -pub(crate) fn yield_now(cx: &mut Context<'_>) -> Poll { - cx.waker().wake_by_ref(); - Poll::Pending -} diff --git a/vendor/hyper/src/common/time.rs b/vendor/hyper/src/common/time.rs deleted file mode 100644 index a8d3cc9c..00000000 --- a/vendor/hyper/src/common/time.rs +++ /dev/null @@ -1,79 +0,0 @@ -#[cfg(any( - all(any(feature = "client", feature = "server"), feature = "http2"), - all(feature = "server", feature = "http1"), -))] -use std::time::Duration; -use std::{fmt, sync::Arc}; -use std::{pin::Pin, time::Instant}; - -use crate::rt::Sleep; -use crate::rt::Timer; - -/// A user-provided timer to time background tasks. -#[derive(Clone)] -pub(crate) enum Time { - Timer(Arc), - Empty, -} - -#[cfg(all(feature = "server", feature = "http1"))] -#[derive(Clone, Copy, Debug)] -pub(crate) enum Dur { - Default(Option), - Configured(Option), -} - -impl fmt::Debug for Time { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Time").finish() - } -} - -impl Time { - #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] - pub(crate) fn sleep(&self, duration: Duration) -> Pin> { - match *self { - Time::Empty => { - panic!("You must supply a timer.") - } - Time::Timer(ref t) => t.sleep(duration), - } - } - - #[cfg(feature = "http1")] - pub(crate) fn sleep_until(&self, deadline: Instant) -> Pin> { - match *self { - Time::Empty => { - panic!("You must supply a timer.") - } - Time::Timer(ref t) => t.sleep_until(deadline), - } - } - - pub(crate) fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { - match *self { - Time::Empty => { - panic!("You must supply a timer.") - } - Time::Timer(ref t) => t.reset(sleep, new_deadline), - } - } - - #[cfg(all(feature = "server", feature = "http1"))] - pub(crate) fn check(&self, dur: Dur, name: &'static str) -> Option { - match dur { - Dur::Default(Some(dur)) => match self { - Time::Empty => { - warn!("timeout `{}` has default, but no timer set", name,); - None - } - Time::Timer(..) => Some(dur), - }, - Dur::Configured(Some(dur)) => match self { - Time::Empty => panic!("timeout `{}` set, but no timer set", name,), - Time::Timer(..) => Some(dur), - }, - Dur::Default(None) | Dur::Configured(None) => None, - } - } -} diff --git a/vendor/hyper/src/common/watch.rs b/vendor/hyper/src/common/watch.rs deleted file mode 100644 index ba17d551..00000000 --- a/vendor/hyper/src/common/watch.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! An SPSC broadcast channel. -//! -//! - The value can only be a `usize`. -//! - The consumer is only notified if the value is different. -//! - The value `0` is reserved for closed. - -use futures_util::task::AtomicWaker; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::task; - -type Value = usize; - -pub(crate) const CLOSED: usize = 0; - -pub(crate) fn channel(initial: Value) -> (Sender, Receiver) { - debug_assert!( - initial != CLOSED, - "watch::channel initial state of 0 is reserved" - ); - - let shared = Arc::new(Shared { - value: AtomicUsize::new(initial), - waker: AtomicWaker::new(), - }); - - ( - Sender { - shared: shared.clone(), - }, - Receiver { shared }, - ) -} - -pub(crate) struct Sender { - shared: Arc, -} - -pub(crate) struct Receiver { - shared: Arc, -} - -struct Shared { - value: AtomicUsize, - waker: AtomicWaker, -} - -impl Sender { - pub(crate) fn send(&mut self, value: Value) { - if self.shared.value.swap(value, Ordering::SeqCst) != value { - self.shared.waker.wake(); - } - } -} - -impl Drop for Sender { - fn drop(&mut self) { - self.send(CLOSED); - } -} - -impl Receiver { - pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value { - self.shared.waker.register(cx.waker()); - self.shared.value.load(Ordering::SeqCst) - } - - pub(crate) fn peek(&self) -> Value { - self.shared.value.load(Ordering::Relaxed) - } -} -- cgit v1.2.3