use bytes::Buf; use futures_core::{ready, stream::Stream}; use http_body::{Body, Frame}; use pin_project_lite::pin_project; use std::{ pin::Pin, task::{Context, Poll}, }; pin_project! { /// A body created from a [`Stream`]. #[derive(Clone, Copy, Debug)] pub struct StreamBody { #[pin] stream: S, } } impl StreamBody { /// Create a new `StreamBody`. pub fn new(stream: S) -> Self { Self { stream } } } impl Body for StreamBody where S: Stream, E>>, D: Buf, { type Data = D; type Error = E; fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { match self.project().stream.poll_next(cx) { Poll::Ready(Some(result)) => Poll::Ready(Some(result)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } impl Stream for StreamBody { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().stream.poll_next(cx) } fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() } } pin_project! { /// A stream created from a [`Body`]. #[derive(Clone, Copy, Debug)] pub struct BodyStream { #[pin] body: B, } } impl BodyStream { /// Create a new `BodyStream`. pub fn new(body: B) -> Self { Self { body } } } impl Body for BodyStream where B: Body, { type Data = B::Data; type Error = B::Error; fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { self.project().body.poll_frame(cx) } } impl Stream for BodyStream where B: Body, { type Item = Result, B::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project().body.poll_frame(cx) { Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } pin_project! { /// A data stream created from a [`Body`]. #[derive(Clone, Copy, Debug)] pub struct BodyDataStream { #[pin] body: B, } } impl BodyDataStream { /// Create a new `BodyDataStream` pub fn new(body: B) -> Self { Self { body } } } impl Stream for BodyDataStream where B: Body, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { return match ready!(self.as_mut().project().body.poll_frame(cx)) { Some(Ok(frame)) => match frame.into_data() { Ok(bytes) => Poll::Ready(Some(Ok(bytes))), Err(_) => continue, }, Some(Err(err)) => Poll::Ready(Some(Err(err))), None => Poll::Ready(None), }; } } } #[cfg(test)] mod tests { use crate::{BodyExt, BodyStream, StreamBody}; use bytes::Bytes; use futures_util::StreamExt; use http_body::Frame; use std::convert::Infallible; #[tokio::test] async fn body_from_stream() { let chunks: Vec> = vec![ Ok(Frame::data(Bytes::from(vec![1]))), Ok(Frame::data(Bytes::from(vec![2]))), Ok(Frame::data(Bytes::from(vec![3]))), ]; let stream = futures_util::stream::iter(chunks); let mut body = StreamBody::new(stream); assert_eq!( body.frame() .await .unwrap() .unwrap() .into_data() .unwrap() .as_ref(), [1] ); assert_eq!( body.frame() .await .unwrap() .unwrap() .into_data() .unwrap() .as_ref(), [2] ); assert_eq!( body.frame() .await .unwrap() .unwrap() .into_data() .unwrap() .as_ref(), [3] ); assert!(body.frame().await.is_none()); } #[tokio::test] async fn stream_from_body() { let chunks: Vec> = vec![ Ok(Frame::data(Bytes::from(vec![1]))), Ok(Frame::data(Bytes::from(vec![2]))), Ok(Frame::data(Bytes::from(vec![3]))), ]; let stream = futures_util::stream::iter(chunks); let body = StreamBody::new(stream); let mut stream = BodyStream::new(body); assert_eq!( stream .next() .await .unwrap() .unwrap() .into_data() .unwrap() .as_ref(), [1] ); assert_eq!( stream .next() .await .unwrap() .unwrap() .into_data() .unwrap() .as_ref(), [2] ); assert_eq!( stream .next() .await .unwrap() .unwrap() .into_data() .unwrap() .as_ref(), [3] ); assert!(stream.next().await.is_none()); } }