summaryrefslogtreecommitdiff
path: root/vendor/async-stream/src/async_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/async-stream/src/async_stream.rs')
-rw-r--r--vendor/async-stream/src/async_stream.rs79
1 files changed, 79 insertions, 0 deletions
diff --git a/vendor/async-stream/src/async_stream.rs b/vendor/async-stream/src/async_stream.rs
new file mode 100644
index 00000000..ff408ab7
--- /dev/null
+++ b/vendor/async-stream/src/async_stream.rs
@@ -0,0 +1,79 @@
+use crate::yielder::Receiver;
+
+use futures_core::{FusedStream, Stream};
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ #[doc(hidden)]
+ #[derive(Debug)]
+ pub struct AsyncStream<T, U> {
+ rx: Receiver<T>,
+ done: bool,
+ #[pin]
+ generator: U,
+ }
+}
+
+impl<T, U> AsyncStream<T, U> {
+ #[doc(hidden)]
+ pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
+ AsyncStream {
+ rx,
+ done: false,
+ generator,
+ }
+ }
+}
+
+impl<T, U> FusedStream for AsyncStream<T, U>
+where
+ U: Future<Output = ()>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done
+ }
+}
+
+impl<T, U> Stream for AsyncStream<T, U>
+where
+ U: Future<Output = ()>,
+{
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let me = self.project();
+
+ if *me.done {
+ return Poll::Ready(None);
+ }
+
+ let mut dst = None;
+ let res = {
+ let _enter = me.rx.enter(&mut dst);
+ me.generator.poll(cx)
+ };
+
+ *me.done = res.is_ready();
+
+ if dst.is_some() {
+ return Poll::Ready(dst.take());
+ }
+
+ if *me.done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.done {
+ (0, Some(0))
+ } else {
+ (0, None)
+ }
+ }
+}