summaryrefslogtreecommitdiff
path: root/vendor/hyper/src/body
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
committermo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
commit8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch)
tree22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/hyper/src/body
parent4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff)
chore: add vendor directory
Diffstat (limited to 'vendor/hyper/src/body')
-rw-r--r--vendor/hyper/src/body/incoming.rs617
-rw-r--r--vendor/hyper/src/body/length.rs129
-rw-r--r--vendor/hyper/src/body/mod.rs50
3 files changed, 796 insertions, 0 deletions
diff --git a/vendor/hyper/src/body/incoming.rs b/vendor/hyper/src/body/incoming.rs
new file mode 100644
index 00000000..dcfb71d5
--- /dev/null
+++ b/vendor/hyper/src/body/incoming.rs
@@ -0,0 +1,617 @@
+use std::fmt;
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use bytes::Bytes;
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+use futures_channel::{mpsc, oneshot};
+#[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+))]
+use futures_util::ready;
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+use http::HeaderMap;
+use http_body::{Body, Frame, SizeHint};
+
+#[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+))]
+use super::DecodedLength;
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+use crate::common::watch;
+#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+use crate::proto::h2::ping;
+
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+type TrailersSender = oneshot::Sender<HeaderMap>;
+
+/// A stream of `Bytes`, used when receiving bodies from the network.
+///
+/// Note that Users should not instantiate this struct directly. When working with the hyper client,
+/// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server,
+/// it is provided within requests.
+///
+/// # Examples
+///
+/// ```rust,ignore
+/// async fn echo(
+/// req: Request<hyper::body::Incoming>,
+/// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
+/// //Here, you can process `Incoming`
+/// }
+/// ```
+#[must_use = "streams do nothing unless polled"]
+pub struct Incoming {
+ kind: Kind,
+}
+
+enum Kind {
+ Empty,
+ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+ Chan {
+ content_length: DecodedLength,
+ want_tx: watch::Sender,
+ data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
+ trailers_rx: oneshot::Receiver<HeaderMap>,
+ },
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ H2 {
+ content_length: DecodedLength,
+ data_done: bool,
+ ping: ping::Recorder,
+ recv: h2::RecvStream,
+ },
+ #[cfg(feature = "ffi")]
+ Ffi(crate::ffi::UserBody),
+}
+
+/// A sender half created through [`Body::channel()`].
+///
+/// Useful when wanting to stream chunks from another thread.
+///
+/// ## Body Closing
+///
+/// Note that the request body will always be closed normally when the sender is dropped (meaning
+/// that the empty terminating chunk will be sent to the remote). If you desire to close the
+/// connection with an incomplete response (e.g. in the case of an error during asynchronous
+/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
+///
+/// [`Body::channel()`]: struct.Body.html#method.channel
+/// [`Sender::abort()`]: struct.Sender.html#method.abort
+#[must_use = "Sender does nothing unless sent on"]
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+pub(crate) struct Sender {
+ want_rx: watch::Receiver,
+ data_tx: BodySender,
+ trailers_tx: Option<TrailersSender>,
+}
+
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+const WANT_PENDING: usize = 1;
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+const WANT_READY: usize = 2;
+
+impl Incoming {
+ /// Create a `Body` stream with an associated sender half.
+ ///
+ /// Useful when wanting to stream chunks from another thread.
+ #[inline]
+ #[cfg(test)]
+ pub(crate) fn channel() -> (Sender, Incoming) {
+ Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
+ }
+
+ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+ pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
+ let (data_tx, data_rx) = mpsc::channel(0);
+ let (trailers_tx, trailers_rx) = oneshot::channel();
+
+ // If wanter is true, `Sender::poll_ready()` won't becoming ready
+ // until the `Body` has been polled for data once.
+ let want = if wanter { WANT_PENDING } else { WANT_READY };
+
+ let (want_tx, want_rx) = watch::channel(want);
+
+ let tx = Sender {
+ want_rx,
+ data_tx,
+ trailers_tx: Some(trailers_tx),
+ };
+ let rx = Incoming::new(Kind::Chan {
+ content_length,
+ want_tx,
+ data_rx,
+ trailers_rx,
+ });
+
+ (tx, rx)
+ }
+
+ fn new(kind: Kind) -> Incoming {
+ Incoming { kind }
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn empty() -> Incoming {
+ Incoming::new(Kind::Empty)
+ }
+
+ #[cfg(feature = "ffi")]
+ pub(crate) fn ffi() -> Incoming {
+ Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
+ }
+
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ pub(crate) fn h2(
+ recv: h2::RecvStream,
+ mut content_length: DecodedLength,
+ ping: ping::Recorder,
+ ) -> Self {
+ // If the stream is already EOS, then the "unknown length" is clearly
+ // actually ZERO.
+ if !content_length.is_exact() && recv.is_end_stream() {
+ content_length = DecodedLength::ZERO;
+ }
+
+ Incoming::new(Kind::H2 {
+ data_done: false,
+ ping,
+ content_length,
+ recv,
+ })
+ }
+
+ #[cfg(feature = "ffi")]
+ pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
+ match self.kind {
+ Kind::Ffi(ref mut body) => return body,
+ _ => {
+ self.kind = Kind::Ffi(crate::ffi::UserBody::new());
+ }
+ }
+
+ match self.kind {
+ Kind::Ffi(ref mut body) => body,
+ _ => unreachable!(),
+ }
+ }
+}
+
+impl Body for Incoming {
+ type Data = Bytes;
+ type Error = crate::Error;
+
+ fn poll_frame(
+ #[cfg_attr(
+ not(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ )),
+ allow(unused_mut)
+ )]
+ mut self: Pin<&mut Self>,
+ #[cfg_attr(
+ not(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ )),
+ allow(unused_variables)
+ )]
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ match self.kind {
+ Kind::Empty => Poll::Ready(None),
+ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+ Kind::Chan {
+ content_length: ref mut len,
+ ref mut data_rx,
+ ref mut want_tx,
+ ref mut trailers_rx,
+ } => {
+ want_tx.send(WANT_READY);
+
+ if !data_rx.is_terminated() {
+ if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
+ len.sub_if(chunk.len() as u64);
+ return Poll::Ready(Some(Ok(Frame::data(chunk))));
+ }
+ }
+
+ // check trailers after data is terminated
+ match ready!(Pin::new(trailers_rx).poll(cx)) {
+ Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
+ Err(_) => Poll::Ready(None),
+ }
+ }
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 {
+ ref mut data_done,
+ ref ping,
+ recv: ref mut h2,
+ content_length: ref mut len,
+ } => {
+ if !*data_done {
+ match ready!(h2.poll_data(cx)) {
+ Some(Ok(bytes)) => {
+ let _ = h2.flow_control().release_capacity(bytes.len());
+ len.sub_if(bytes.len() as u64);
+ ping.record_data(bytes.len());
+ return Poll::Ready(Some(Ok(Frame::data(bytes))));
+ }
+ Some(Err(e)) => {
+ return match e.reason() {
+ // These reasons should cause the body reading to stop, but not fail it.
+ // The same logic as for `Read for H2Upgraded` is applied here.
+ Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
+ Poll::Ready(None)
+ }
+ _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
+ };
+ }
+ None => {
+ *data_done = true;
+ // fall through to trailers
+ }
+ }
+ }
+
+ // after data, check trailers
+ match ready!(h2.poll_trailers(cx)) {
+ Ok(t) => {
+ ping.record_non_data();
+ Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
+ }
+ Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
+ }
+ }
+
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(ref mut body) => body.poll_data(cx),
+ }
+ }
+
+ fn is_end_stream(&self) -> bool {
+ match self.kind {
+ Kind::Empty => true,
+ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+ Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(..) => false,
+ }
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ #[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ ))]
+ fn opt_len(decoded_length: DecodedLength) -> SizeHint {
+ if let Some(content_length) = decoded_length.into_opt() {
+ SizeHint::with_exact(content_length)
+ } else {
+ SizeHint::default()
+ }
+ }
+
+ match self.kind {
+ Kind::Empty => SizeHint::with_exact(0),
+ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+ Kind::Chan { content_length, .. } => opt_len(content_length),
+ #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
+ Kind::H2 { content_length, .. } => opt_len(content_length),
+ #[cfg(feature = "ffi")]
+ Kind::Ffi(..) => SizeHint::default(),
+ }
+ }
+}
+
+impl fmt::Debug for Incoming {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ #[cfg(any(
+ all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ ),
+ feature = "ffi"
+ ))]
+ #[derive(Debug)]
+ struct Streaming;
+ #[derive(Debug)]
+ struct Empty;
+
+ let mut builder = f.debug_tuple("Body");
+ match self.kind {
+ Kind::Empty => builder.field(&Empty),
+ #[cfg(any(
+ all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ ),
+ feature = "ffi"
+ ))]
+ _ => builder.field(&Streaming),
+ };
+
+ builder.finish()
+ }
+}
+
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+impl Sender {
+ /// Check to see if this `Sender` can send more data.
+ pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
+ // Check if the receiver end has tried polling for the body yet
+ ready!(self.poll_want(cx)?);
+ self.data_tx
+ .poll_ready(cx)
+ .map_err(|_| crate::Error::new_closed())
+ }
+
+ fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
+ match self.want_rx.load(cx) {
+ WANT_READY => Poll::Ready(Ok(())),
+ WANT_PENDING => Poll::Pending,
+ watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
+ unexpected => unreachable!("want_rx value: {}", unexpected),
+ }
+ }
+
+ #[cfg(test)]
+ async fn ready(&mut self) -> crate::Result<()> {
+ futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
+ }
+
+ /// Send data on data channel when it is ready.
+ #[cfg(test)]
+ #[allow(unused)]
+ pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
+ self.ready().await?;
+ self.data_tx
+ .try_send(Ok(chunk))
+ .map_err(|_| crate::Error::new_closed())
+ }
+
+ /// Send trailers on trailers channel.
+ #[allow(unused)]
+ pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
+ let tx = match self.trailers_tx.take() {
+ Some(tx) => tx,
+ None => return Err(crate::Error::new_closed()),
+ };
+ tx.send(trailers).map_err(|_| crate::Error::new_closed())
+ }
+
+ /// Try to send data on this channel.
+ ///
+ /// # Errors
+ ///
+ /// Returns `Err(Bytes)` if the channel could not (currently) accept
+ /// another `Bytes`.
+ ///
+ /// # Note
+ ///
+ /// This is mostly useful for when trying to send from some other thread
+ /// that doesn't have an async context. If in an async context, prefer
+ /// `send_data()` instead.
+ #[cfg(feature = "http1")]
+ pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
+ self.data_tx
+ .try_send(Ok(chunk))
+ .map_err(|err| err.into_inner().expect("just sent Ok"))
+ }
+
+ #[cfg(feature = "http1")]
+ pub(crate) fn try_send_trailers(
+ &mut self,
+ trailers: HeaderMap,
+ ) -> Result<(), Option<HeaderMap>> {
+ let tx = match self.trailers_tx.take() {
+ Some(tx) => tx,
+ None => return Err(None),
+ };
+
+ tx.send(trailers).map_err(Some)
+ }
+
+ #[cfg(test)]
+ pub(crate) fn abort(mut self) {
+ self.send_error(crate::Error::new_body_write_aborted());
+ }
+
+ pub(crate) fn send_error(&mut self, err: crate::Error) {
+ let _ = self
+ .data_tx
+ // clone so the send works even if buffer is full
+ .clone()
+ .try_send(Err(err));
+ }
+}
+
+#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
+impl fmt::Debug for Sender {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ #[derive(Debug)]
+ struct Open;
+ #[derive(Debug)]
+ struct Closed;
+
+ let mut builder = f.debug_tuple("Sender");
+ match self.want_rx.peek() {
+ watch::CLOSED => builder.field(&Closed),
+ _ => builder.field(&Open),
+ };
+
+ builder.finish()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::mem;
+ use std::task::Poll;
+
+ use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
+ use http_body_util::BodyExt;
+
+ #[test]
+ fn test_size_of() {
+ // These are mostly to help catch *accidentally* increasing
+ // the size by too much.
+
+ let body_size = mem::size_of::<Incoming>();
+ let body_expected_size = mem::size_of::<u64>() * 5;
+ assert!(
+ body_size <= body_expected_size,
+ "Body size = {} <= {}",
+ body_size,
+ body_expected_size,
+ );
+
+ //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");
+
+ assert_eq!(
+ mem::size_of::<Sender>(),
+ mem::size_of::<usize>() * 5,
+ "Sender"
+ );
+
+ assert_eq!(
+ mem::size_of::<Sender>(),
+ mem::size_of::<Option<Sender>>(),
+ "Option<Sender>"
+ );
+ }
+
+ #[test]
+ fn size_hint() {
+ fn eq(body: Incoming, b: SizeHint, note: &str) {
+ let a = body.size_hint();
+ assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
+ assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
+ }
+
+ eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
+
+ eq(Incoming::channel().1, SizeHint::new(), "channel");
+
+ eq(
+ Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
+ SizeHint::with_exact(4),
+ "channel with length",
+ );
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn channel_abort() {
+ let (tx, mut rx) = Incoming::channel();
+
+ tx.abort();
+
+ let err = rx.frame().await.unwrap().unwrap_err();
+ assert!(err.is_body_write_aborted(), "{:?}", err);
+ }
+
+ #[cfg(all(not(miri), feature = "http1"))]
+ #[tokio::test]
+ async fn channel_abort_when_buffer_is_full() {
+ let (mut tx, mut rx) = Incoming::channel();
+
+ tx.try_send_data("chunk 1".into()).expect("send 1");
+ // buffer is full, but can still send abort
+ tx.abort();
+
+ let chunk1 = rx
+ .frame()
+ .await
+ .expect("item 1")
+ .expect("chunk 1")
+ .into_data()
+ .unwrap();
+ assert_eq!(chunk1, "chunk 1");
+
+ let err = rx.frame().await.unwrap().unwrap_err();
+ assert!(err.is_body_write_aborted(), "{:?}", err);
+ }
+
+ #[cfg(feature = "http1")]
+ #[test]
+ fn channel_buffers_one() {
+ let (mut tx, _rx) = Incoming::channel();
+
+ tx.try_send_data("chunk 1".into()).expect("send 1");
+
+ // buffer is now full
+ let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
+ assert_eq!(chunk2, "chunk 2");
+ }
+
+ #[cfg(not(miri))]
+ #[tokio::test]
+ async fn channel_empty() {
+ let (_, mut rx) = Incoming::channel();
+
+ assert!(rx.frame().await.is_none());
+ }
+
+ #[test]
+ fn channel_ready() {
+ let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
+
+ let mut tx_ready = tokio_test::task::spawn(tx.ready());
+
+ assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
+ }
+
+ #[test]
+ fn channel_wanter() {
+ let (mut tx, mut rx) =
+ Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
+
+ let mut tx_ready = tokio_test::task::spawn(tx.ready());
+ let mut rx_data = tokio_test::task::spawn(rx.frame());
+
+ assert!(
+ tx_ready.poll().is_pending(),
+ "tx isn't ready before rx has been polled"
+ );
+
+ assert!(rx_data.poll().is_pending(), "poll rx.data");
+ assert!(tx_ready.is_woken(), "rx poll wakes tx");
+
+ assert!(
+ tx_ready.poll().is_ready(),
+ "tx is ready after rx has been polled"
+ );
+ }
+
+ #[test]
+ fn channel_notices_closure() {
+ let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
+
+ let mut tx_ready = tokio_test::task::spawn(tx.ready());
+
+ assert!(
+ tx_ready.poll().is_pending(),
+ "tx isn't ready before rx has been polled"
+ );
+
+ drop(rx);
+ assert!(tx_ready.is_woken(), "dropping rx wakes tx");
+
+ match tx_ready.poll() {
+ Poll::Ready(Err(ref e)) if e.is_closed() => (),
+ unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
+ }
+ }
+}
diff --git a/vendor/hyper/src/body/length.rs b/vendor/hyper/src/body/length.rs
new file mode 100644
index 00000000..e5eab744
--- /dev/null
+++ b/vendor/hyper/src/body/length.rs
@@ -0,0 +1,129 @@
+use std::fmt;
+
+#[derive(Clone, Copy, PartialEq, Eq)]
+pub(crate) struct DecodedLength(u64);
+
+#[cfg(any(feature = "http1", feature = "http2"))]
+impl From<Option<u64>> for DecodedLength {
+ fn from(len: Option<u64>) -> Self {
+ len.and_then(|len| {
+ // If the length is u64::MAX, oh well, just reported chunked.
+ Self::checked_new(len).ok()
+ })
+ .unwrap_or(DecodedLength::CHUNKED)
+ }
+}
+
+#[cfg(any(feature = "http1", feature = "http2", test))]
+const MAX_LEN: u64 = u64::MAX - 2;
+
+impl DecodedLength {
+ pub(crate) const CLOSE_DELIMITED: DecodedLength = DecodedLength(u64::MAX);
+ pub(crate) const CHUNKED: DecodedLength = DecodedLength(u64::MAX - 1);
+ pub(crate) const ZERO: DecodedLength = DecodedLength(0);
+
+ #[cfg(test)]
+ pub(crate) fn new(len: u64) -> Self {
+ debug_assert!(len <= MAX_LEN);
+ DecodedLength(len)
+ }
+
+ /// Takes the length as a content-length without other checks.
+ ///
+ /// Should only be called if previously confirmed this isn't
+ /// CLOSE_DELIMITED or CHUNKED.
+ #[inline]
+ #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
+ pub(crate) fn danger_len(self) -> u64 {
+ debug_assert!(self.0 < Self::CHUNKED.0);
+ self.0
+ }
+
+ /// Converts to an Option<u64> representing a Known or Unknown length.
+ #[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ ))]
+ pub(crate) fn into_opt(self) -> Option<u64> {
+ match self {
+ DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => None,
+ DecodedLength(known) => Some(known),
+ }
+ }
+
+ /// Checks the `u64` is within the maximum allowed for content-length.
+ #[cfg(any(feature = "http1", feature = "http2"))]
+ pub(crate) fn checked_new(len: u64) -> Result<Self, crate::error::Parse> {
+ if len <= MAX_LEN {
+ Ok(DecodedLength(len))
+ } else {
+ warn!("content-length bigger than maximum: {} > {}", len, MAX_LEN);
+ Err(crate::error::Parse::TooLarge)
+ }
+ }
+
+ #[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+ ))]
+ pub(crate) fn sub_if(&mut self, amt: u64) {
+ match *self {
+ DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => (),
+ DecodedLength(ref mut known) => {
+ *known -= amt;
+ }
+ }
+ }
+
+ /// Returns whether this represents an exact length.
+ ///
+ /// This includes 0, which of course is an exact known length.
+ ///
+ /// It would return false if "chunked" or otherwise size-unknown.
+ #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
+ pub(crate) fn is_exact(&self) -> bool {
+ self.0 <= MAX_LEN
+ }
+}
+
+impl fmt::Debug for DecodedLength {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ DecodedLength::CLOSE_DELIMITED => f.write_str("CLOSE_DELIMITED"),
+ DecodedLength::CHUNKED => f.write_str("CHUNKED"),
+ DecodedLength(n) => f.debug_tuple("DecodedLength").field(&n).finish(),
+ }
+ }
+}
+
+impl fmt::Display for DecodedLength {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ DecodedLength::CLOSE_DELIMITED => f.write_str("close-delimited"),
+ DecodedLength::CHUNKED => f.write_str("chunked encoding"),
+ DecodedLength::ZERO => f.write_str("empty"),
+ DecodedLength(n) => write!(f, "content-length ({} bytes)", n),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn sub_if_known() {
+ let mut len = DecodedLength::new(30);
+ len.sub_if(20);
+
+ assert_eq!(len.0, 10);
+ }
+
+ #[test]
+ fn sub_if_chunked() {
+ let mut len = DecodedLength::CHUNKED;
+ len.sub_if(20);
+
+ assert_eq!(len, DecodedLength::CHUNKED);
+ }
+}
diff --git a/vendor/hyper/src/body/mod.rs b/vendor/hyper/src/body/mod.rs
new file mode 100644
index 00000000..7b71d98b
--- /dev/null
+++ b/vendor/hyper/src/body/mod.rs
@@ -0,0 +1,50 @@
+//! Streaming bodies for Requests and Responses
+//!
+//! For both [Clients](crate::client) and [Servers](crate::server), requests and
+//! responses use streaming bodies, instead of complete buffering. This
+//! allows applications to not use memory they don't need, and allows exerting
+//! back-pressure on connections by only reading when asked.
+//!
+//! There are two pieces to this in hyper:
+//!
+//! - **The [`Body`] trait** describes all possible bodies.
+//! hyper allows any body type that implements `Body`, allowing
+//! applications to have fine-grained control over their streaming.
+//! - **The [`Incoming`] concrete type**, which is an implementation
+//! of `Body`, and returned by hyper as a "receive stream" (so, for server
+//! requests and client responses).
+//!
+//! There are additional implementations available in [`http-body-util`][],
+//! such as a `Full` or `Empty` body.
+//!
+//! [`http-body-util`]: https://docs.rs/http-body-util
+
+pub use bytes::{Buf, Bytes};
+pub use http_body::Body;
+pub use http_body::Frame;
+pub use http_body::SizeHint;
+
+pub use self::incoming::Incoming;
+
+#[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))]
+pub(crate) use self::incoming::Sender;
+#[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+))]
+pub(crate) use self::length::DecodedLength;
+
+mod incoming;
+#[cfg(all(
+ any(feature = "http1", feature = "http2"),
+ any(feature = "client", feature = "server")
+))]
+mod length;
+
+fn _assert_send_sync() {
+ fn _assert_send<T: Send>() {}
+ fn _assert_sync<T: Sync>() {}
+
+ _assert_send::<Incoming>();
+ _assert_sync::<Incoming>();
+}