diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-15 16:37:08 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-17 16:30:22 -0600 |
| commit | 45df4d0d9b577fecee798d672695fe24ff57fb1b (patch) | |
| tree | 1b99bf645035b58e0d6db08c7a83521f41f7a75b /vendor/hyper/src/client/dispatch.rs | |
| parent | f94f79608393d4ab127db63cc41668445ef6b243 (diff) | |
feat: migrate from Cedar to SpiceDB authorization system
This is a major architectural change that replaces the Cedar policy-based
authorization system with SpiceDB's relation-based authorization.
Key changes:
- Migrate from Rust to Go implementation
- Replace Cedar policies with SpiceDB schema and relationships
- Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks
- Update build system and dependencies for Go ecosystem
- Maintain Envoy integration for external authorization
This change enables more flexible permission modeling through SpiceDB's
Google Zanzibar inspired relation-based system, supporting complex
hierarchical permissions that were difficult to express in Cedar.
Breaking change: Existing Cedar policies and Rust-based configuration
will no longer work and need to be migrated to SpiceDB schema.
Diffstat (limited to 'vendor/hyper/src/client/dispatch.rs')
| -rw-r--r-- | vendor/hyper/src/client/dispatch.rs | 510 |
1 files changed, 0 insertions, 510 deletions
diff --git a/vendor/hyper/src/client/dispatch.rs b/vendor/hyper/src/client/dispatch.rs deleted file mode 100644 index 4ae41c50..00000000 --- a/vendor/hyper/src/client/dispatch.rs +++ /dev/null @@ -1,510 +0,0 @@ -use std::task::{Context, Poll}; -#[cfg(feature = "http2")] -use std::{future::Future, pin::Pin}; - -#[cfg(feature = "http2")] -use http::{Request, Response}; -#[cfg(feature = "http2")] -use http_body::Body; -#[cfg(feature = "http2")] -use pin_project_lite::pin_project; -use tokio::sync::{mpsc, oneshot}; - -#[cfg(feature = "http2")] -use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; - -pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>; -pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; - -/// An error when calling `try_send_request`. -/// -/// There is a possibility of an error occurring on a connection in-between the -/// time that a request is queued and when it is actually written to the IO -/// transport. If that happens, it is safe to return the request back to the -/// caller, as it was never fully sent. -#[derive(Debug)] -pub struct TrySendError<T> { - pub(crate) error: crate::Error, - pub(crate) message: Option<T>, -} - -pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { - let (tx, rx) = mpsc::unbounded_channel(); - let (giver, taker) = want::new(); - let tx = Sender { - #[cfg(feature = "http1")] - buffered_once: false, - giver, - inner: tx, - }; - let rx = Receiver { inner: rx, taker }; - (tx, rx) -} - -/// A bounded sender of requests and callbacks for when responses are ready. -/// -/// While the inner sender is unbounded, the Giver is used to determine -/// if the Receiver is ready for another request. -pub(crate) struct Sender<T, U> { - /// One message is always allowed, even if the Receiver hasn't asked - /// for it yet. This boolean keeps track of whether we've sent one - /// without notice. - #[cfg(feature = "http1")] - buffered_once: bool, - /// The Giver helps watch that the Receiver side has been polled - /// when the queue is empty. This helps us know when a request and - /// response have been fully processed, and a connection is ready - /// for more. - giver: want::Giver, - /// Actually bounded by the Giver, plus `buffered_once`. - inner: mpsc::UnboundedSender<Envelope<T, U>>, -} - -/// An unbounded version. -/// -/// Cannot poll the Giver, but can still use it to determine if the Receiver -/// has been dropped. However, this version can be cloned. -#[cfg(feature = "http2")] -pub(crate) struct UnboundedSender<T, U> { - /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. - giver: want::SharedGiver, - inner: mpsc::UnboundedSender<Envelope<T, U>>, -} - -impl<T, U> Sender<T, U> { - #[cfg(feature = "http1")] - pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { - self.giver - .poll_want(cx) - .map_err(|_| crate::Error::new_closed()) - } - - #[cfg(feature = "http1")] - pub(crate) fn is_ready(&self) -> bool { - self.giver.is_wanting() - } - - #[cfg(feature = "http1")] - pub(crate) fn is_closed(&self) -> bool { - self.giver.is_canceled() - } - - #[cfg(feature = "http1")] - fn can_send(&mut self) -> bool { - if self.giver.give() || !self.buffered_once { - // If the receiver is ready *now*, then of course we can send. - // - // If the receiver isn't ready yet, but we don't have anything - // in the channel yet, then allow one message. - self.buffered_once = true; - true - } else { - false - } - } - - #[cfg(feature = "http1")] - pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { - if !self.can_send() { - return Err(val); - } - let (tx, rx) = oneshot::channel(); - self.inner - .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) - .map(move |_| rx) - .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) - } - - #[cfg(feature = "http1")] - pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { - if !self.can_send() { - return Err(val); - } - let (tx, rx) = oneshot::channel(); - self.inner - .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) - .map(move |_| rx) - .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) - } - - #[cfg(feature = "http2")] - pub(crate) fn unbound(self) -> UnboundedSender<T, U> { - UnboundedSender { - giver: self.giver.shared(), - inner: self.inner, - } - } -} - -#[cfg(feature = "http2")] -impl<T, U> UnboundedSender<T, U> { - pub(crate) fn is_ready(&self) -> bool { - !self.giver.is_canceled() - } - - pub(crate) fn is_closed(&self) -> bool { - self.giver.is_canceled() - } - - pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { - let (tx, rx) = oneshot::channel(); - self.inner - .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) - .map(move |_| rx) - .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) - } - - pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { - let (tx, rx) = oneshot::channel(); - self.inner - .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) - .map(move |_| rx) - .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) - } -} - -#[cfg(feature = "http2")] -impl<T, U> Clone for UnboundedSender<T, U> { - fn clone(&self) -> Self { - UnboundedSender { - giver: self.giver.clone(), - inner: self.inner.clone(), - } - } -} - -pub(crate) struct Receiver<T, U> { - inner: mpsc::UnboundedReceiver<Envelope<T, U>>, - taker: want::Taker, -} - -impl<T, U> Receiver<T, U> { - pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> { - match self.inner.poll_recv(cx) { - Poll::Ready(item) => { - Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) - } - Poll::Pending => { - self.taker.want(); - Poll::Pending - } - } - } - - #[cfg(feature = "http1")] - pub(crate) fn close(&mut self) { - self.taker.cancel(); - self.inner.close(); - } - - #[cfg(feature = "http1")] - pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> { - use futures_util::FutureExt; - match self.inner.recv().now_or_never() { - Some(Some(mut env)) => env.0.take(), - _ => None, - } - } -} - -impl<T, U> Drop for Receiver<T, U> { - fn drop(&mut self) { - // Notify the giver about the closure first, before dropping - // the mpsc::Receiver. - self.taker.cancel(); - } -} - -struct Envelope<T, U>(Option<(T, Callback<T, U>)>); - -impl<T, U> Drop for Envelope<T, U> { - fn drop(&mut self) { - if let Some((val, cb)) = self.0.take() { - cb.send(Err(TrySendError { - error: crate::Error::new_canceled().with("connection closed"), - message: Some(val), - })); - } - } -} - -pub(crate) enum Callback<T, U> { - #[allow(unused)] - Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>), - NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>), -} - -impl<T, U> Drop for Callback<T, U> { - fn drop(&mut self) { - match self { - Callback::Retry(tx) => { - if let Some(tx) = tx.take() { - let _ = tx.send(Err(TrySendError { - error: dispatch_gone(), - message: None, - })); - } - } - Callback::NoRetry(tx) => { - if let Some(tx) = tx.take() { - let _ = tx.send(Err(dispatch_gone())); - } - } - } - } -} - -#[cold] -fn dispatch_gone() -> crate::Error { - // FIXME(nox): What errors do we want here? - crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() { - "user code panicked" - } else { - "runtime dropped the dispatch task" - }) -} - -impl<T, U> Callback<T, U> { - #[cfg(feature = "http2")] - pub(crate) fn is_canceled(&self) -> bool { - match *self { - Callback::Retry(Some(ref tx)) => tx.is_closed(), - Callback::NoRetry(Some(ref tx)) => tx.is_closed(), - _ => unreachable!(), - } - } - - pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { - match *self { - Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), - Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), - _ => unreachable!(), - } - } - - pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) { - match self { - Callback::Retry(ref mut tx) => { - let _ = tx.take().unwrap().send(val); - } - Callback::NoRetry(ref mut tx) => { - let _ = tx.take().unwrap().send(val.map_err(|e| e.error)); - } - } - } -} - -impl<T> TrySendError<T> { - /// Take the message from this error. - /// - /// The message will not always have been recovered. If an error occurs - /// after the message has been serialized onto the connection, it will not - /// be available here. - pub fn take_message(&mut self) -> Option<T> { - self.message.take() - } - - /// Consumes this to return the inner error. - pub fn into_error(self) -> crate::Error { - self.error - } -} - -#[cfg(feature = "http2")] -pin_project! { - pub struct SendWhen<B> - where - B: Body, - B: 'static, - { - #[pin] - pub(crate) when: ResponseFutMap<B>, - #[pin] - pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>, - } -} - -#[cfg(feature = "http2")] -impl<B> Future for SendWhen<B> -where - B: Body + 'static, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.project(); - - let mut call_back = this.call_back.take().expect("polled after complete"); - - match Pin::new(&mut this.when).poll(cx) { - Poll::Ready(Ok(res)) => { - call_back.send(Ok(res)); - Poll::Ready(()) - } - Poll::Pending => { - // check if the callback is canceled - match call_back.poll_canceled(cx) { - Poll::Ready(v) => v, - Poll::Pending => { - // Move call_back back to struct before return - this.call_back.set(Some(call_back)); - return Poll::Pending; - } - }; - trace!("send_when canceled"); - Poll::Ready(()) - } - Poll::Ready(Err((error, message))) => { - call_back.send(Err(TrySendError { error, message })); - Poll::Ready(()) - } - } - } -} - -#[cfg(test)] -mod tests { - #[cfg(feature = "nightly")] - extern crate test; - - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - - use super::{channel, Callback, Receiver}; - - #[derive(Debug)] - struct Custom(#[allow(dead_code)] i32); - - impl<T, U> Future for Receiver<T, U> { - type Output = Option<(T, Callback<T, U>)>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - self.poll_recv(cx) - } - } - - /// Helper to check if the future is ready after polling once. - struct PollOnce<'a, F>(&'a mut F); - - impl<F, T> Future for PollOnce<'_, F> - where - F: Future<Output = T> + Unpin, - { - type Output = Option<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match Pin::new(&mut self.0).poll(cx) { - Poll::Ready(_) => Poll::Ready(Some(())), - Poll::Pending => Poll::Ready(None), - } - } - } - - #[cfg(not(miri))] - #[tokio::test] - async fn drop_receiver_sends_cancel_errors() { - let _ = pretty_env_logger::try_init(); - - let (mut tx, mut rx) = channel::<Custom, ()>(); - - // must poll once for try_send to succeed - assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); - - let promise = tx.try_send(Custom(43)).unwrap(); - drop(rx); - - let fulfilled = promise.await; - let err = fulfilled - .expect("fulfilled") - .expect_err("promise should error"); - match (err.error.is_canceled(), err.message) { - (true, Some(_)) => (), - e => panic!("expected Error::Cancel(_), found {:?}", e), - } - } - - #[cfg(not(miri))] - #[tokio::test] - async fn sender_checks_for_want_on_send() { - let (mut tx, mut rx) = channel::<Custom, ()>(); - - // one is allowed to buffer, second is rejected - let _ = tx.try_send(Custom(1)).expect("1 buffered"); - tx.try_send(Custom(2)).expect_err("2 not ready"); - - assert!(PollOnce(&mut rx).await.is_some(), "rx once"); - - // Even though 1 has been popped, only 1 could be buffered for the - // lifetime of the channel. - tx.try_send(Custom(2)).expect_err("2 still not ready"); - - assert!(PollOnce(&mut rx).await.is_none(), "rx empty"); - - let _ = tx.try_send(Custom(2)).expect("2 ready"); - } - - #[cfg(feature = "http2")] - #[test] - fn unbounded_sender_doesnt_bound_on_want() { - let (tx, rx) = channel::<Custom, ()>(); - let mut tx = tx.unbound(); - - let _ = tx.try_send(Custom(1)).unwrap(); - let _ = tx.try_send(Custom(2)).unwrap(); - let _ = tx.try_send(Custom(3)).unwrap(); - - drop(rx); - - let _ = tx.try_send(Custom(4)).unwrap_err(); - } - - #[cfg(feature = "nightly")] - #[bench] - fn giver_queue_throughput(b: &mut test::Bencher) { - use crate::{body::Incoming, Request, Response}; - - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>(); - - b.iter(move || { - let _ = tx.send(Request::new(Incoming::empty())).unwrap(); - rt.block_on(async { - loop { - let poll_once = PollOnce(&mut rx); - let opt = poll_once.await; - if opt.is_none() { - break; - } - } - }); - }) - } - - #[cfg(feature = "nightly")] - #[bench] - fn giver_queue_not_ready(b: &mut test::Bencher) { - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); - let (_tx, mut rx) = channel::<i32, ()>(); - b.iter(move || { - rt.block_on(async { - let poll_once = PollOnce(&mut rx); - assert!(poll_once.await.is_none()); - }); - }) - } - - #[cfg(feature = "nightly")] - #[bench] - fn giver_queue_cancel(b: &mut test::Bencher) { - let (_tx, mut rx) = channel::<i32, ()>(); - - b.iter(move || { - rx.taker.cancel(); - }) - } -} |
