summaryrefslogtreecommitdiff
path: root/vendor/async-stream/src
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/async-stream/src')
-rw-r--r--vendor/async-stream/src/async_stream.rs79
-rw-r--r--vendor/async-stream/src/lib.rs242
-rw-r--r--vendor/async-stream/src/next.rs32
-rw-r--r--vendor/async-stream/src/yielder.rs94
4 files changed, 447 insertions, 0 deletions
diff --git a/vendor/async-stream/src/async_stream.rs b/vendor/async-stream/src/async_stream.rs
new file mode 100644
index 00000000..ff408ab7
--- /dev/null
+++ b/vendor/async-stream/src/async_stream.rs
@@ -0,0 +1,79 @@
+use crate::yielder::Receiver;
+
+use futures_core::{FusedStream, Stream};
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ #[doc(hidden)]
+ #[derive(Debug)]
+ pub struct AsyncStream<T, U> {
+ rx: Receiver<T>,
+ done: bool,
+ #[pin]
+ generator: U,
+ }
+}
+
+impl<T, U> AsyncStream<T, U> {
+ #[doc(hidden)]
+ pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
+ AsyncStream {
+ rx,
+ done: false,
+ generator,
+ }
+ }
+}
+
+impl<T, U> FusedStream for AsyncStream<T, U>
+where
+ U: Future<Output = ()>,
+{
+ fn is_terminated(&self) -> bool {
+ self.done
+ }
+}
+
+impl<T, U> Stream for AsyncStream<T, U>
+where
+ U: Future<Output = ()>,
+{
+ type Item = T;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let me = self.project();
+
+ if *me.done {
+ return Poll::Ready(None);
+ }
+
+ let mut dst = None;
+ let res = {
+ let _enter = me.rx.enter(&mut dst);
+ me.generator.poll(cx)
+ };
+
+ *me.done = res.is_ready();
+
+ if dst.is_some() {
+ return Poll::Ready(dst.take());
+ }
+
+ if *me.done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ if self.done {
+ (0, Some(0))
+ } else {
+ (0, None)
+ }
+ }
+}
diff --git a/vendor/async-stream/src/lib.rs b/vendor/async-stream/src/lib.rs
new file mode 100644
index 00000000..318e404e
--- /dev/null
+++ b/vendor/async-stream/src/lib.rs
@@ -0,0 +1,242 @@
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ unreachable_pub
+)]
+#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
+
+//! Asynchronous stream of elements.
+//!
+//! Provides two macros, `stream!` and `try_stream!`, allowing the caller to
+//! define asynchronous streams of elements. These are implemented using `async`
+//! & `await` notation. This crate works without unstable features.
+//!
+//! The `stream!` macro returns an anonymous type implementing the [`Stream`]
+//! trait. The `Item` associated type is the type of the values yielded from the
+//! stream. The `try_stream!` also returns an anonymous type implementing the
+//! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
+//! `try_stream!` macro supports using `?` notation as part of the
+//! implementation.
+//!
+//! # Usage
+//!
+//! A basic stream yielding numbers. Values are yielded using the `yield`
+//! keyword. The stream block must return `()`.
+//!
+//! ```rust
+//! use async_stream::stream;
+//!
+//! use futures_util::pin_mut;
+//! use futures_util::stream::StreamExt;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let s = stream! {
+//! for i in 0..3 {
+//! yield i;
+//! }
+//! };
+//!
+//! pin_mut!(s); // needed for iteration
+//!
+//! while let Some(value) = s.next().await {
+//! println!("got {}", value);
+//! }
+//! }
+//! ```
+//!
+//! Streams may be returned by using `impl Stream<Item = T>`:
+//!
+//! ```rust
+//! use async_stream::stream;
+//!
+//! use futures_core::stream::Stream;
+//! use futures_util::pin_mut;
+//! use futures_util::stream::StreamExt;
+//!
+//! fn zero_to_three() -> impl Stream<Item = u32> {
+//! stream! {
+//! for i in 0..3 {
+//! yield i;
+//! }
+//! }
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let s = zero_to_three();
+//! pin_mut!(s); // needed for iteration
+//!
+//! while let Some(value) = s.next().await {
+//! println!("got {}", value);
+//! }
+//! }
+//! ```
+//!
+//! Streams may be implemented in terms of other streams - `async-stream` provides `for await`
+//! syntax to assist with this:
+//!
+//! ```rust
+//! use async_stream::stream;
+//!
+//! use futures_core::stream::Stream;
+//! use futures_util::pin_mut;
+//! use futures_util::stream::StreamExt;
+//!
+//! fn zero_to_three() -> impl Stream<Item = u32> {
+//! stream! {
+//! for i in 0..3 {
+//! yield i;
+//! }
+//! }
+//! }
+//!
+//! fn double<S: Stream<Item = u32>>(input: S)
+//! -> impl Stream<Item = u32>
+//! {
+//! stream! {
+//! for await value in input {
+//! yield value * 2;
+//! }
+//! }
+//! }
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let s = double(zero_to_three());
+//! pin_mut!(s); // needed for iteration
+//!
+//! while let Some(value) = s.next().await {
+//! println!("got {}", value);
+//! }
+//! }
+//! ```
+//!
+//! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
+//! of the returned stream is `Result` with `Ok` being the value yielded and
+//! `Err` the error type returned by `?`.
+//!
+//! ```rust
+//! use tokio::net::{TcpListener, TcpStream};
+//!
+//! use async_stream::try_stream;
+//! use futures_core::stream::Stream;
+//!
+//! use std::io;
+//! use std::net::SocketAddr;
+//!
+//! fn bind_and_accept(addr: SocketAddr)
+//! -> impl Stream<Item = io::Result<TcpStream>>
+//! {
+//! try_stream! {
+//! let mut listener = TcpListener::bind(addr).await?;
+//!
+//! loop {
+//! let (stream, addr) = listener.accept().await?;
+//! println!("received on {:?}", addr);
+//! yield stream;
+//! }
+//! }
+//! }
+//! ```
+//!
+//! # Implementation
+//!
+//! The `stream!` and `try_stream!` macros are implemented using proc macros.
+//! The macro searches the syntax tree for instances of `yield $expr` and
+//! transforms them into `sender.send($expr).await`.
+//!
+//! The stream uses a lightweight sender to send values from the stream
+//! implementation to the caller. When entering the stream, an `Option<T>` is
+//! stored on the stack. A pointer to the cell is stored in a thread local and
+//! `poll` is called on the async block. When `poll` returns.
+//! `sender.send(value)` stores the value that cell and yields back to the
+//! caller.
+//!
+//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
+
+mod async_stream;
+mod next;
+mod yielder;
+
+/// Asynchronous stream
+///
+/// See [crate](index.html) documentation for more details.
+///
+/// # Examples
+///
+/// ```
+/// use async_stream::stream;
+///
+/// use futures_util::pin_mut;
+/// use futures_util::stream::StreamExt;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let s = stream! {
+/// for i in 0..3 {
+/// yield i;
+/// }
+/// };
+///
+/// pin_mut!(s); // needed for iteration
+///
+/// while let Some(value) = s.next().await {
+/// println!("got {}", value);
+/// }
+/// }
+/// ```
+#[macro_export]
+macro_rules! stream {
+ ($($tt:tt)*) => {
+ $crate::__private::stream_inner!(($crate) $($tt)*)
+ }
+}
+
+/// Asynchronous fallible stream
+///
+/// See [crate](index.html) documentation for more details.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::net::{TcpListener, TcpStream};
+///
+/// use async_stream::try_stream;
+/// use futures_core::stream::Stream;
+///
+/// use std::io;
+/// use std::net::SocketAddr;
+///
+/// fn bind_and_accept(addr: SocketAddr)
+/// -> impl Stream<Item = io::Result<TcpStream>>
+/// {
+/// try_stream! {
+/// let mut listener = TcpListener::bind(addr).await?;
+///
+/// loop {
+/// let (stream, addr) = listener.accept().await?;
+/// println!("received on {:?}", addr);
+/// yield stream;
+/// }
+/// }
+/// }
+/// ```
+#[macro_export]
+macro_rules! try_stream {
+ ($($tt:tt)*) => {
+ $crate::__private::try_stream_inner!(($crate) $($tt)*)
+ }
+}
+
+// Not public API.
+#[doc(hidden)]
+pub mod __private {
+ pub use crate::async_stream::AsyncStream;
+ pub use crate::next::next;
+ pub use async_stream_impl::{stream_inner, try_stream_inner};
+ pub mod yielder {
+ pub use crate::yielder::pair;
+ }
+}
diff --git a/vendor/async-stream/src/next.rs b/vendor/async-stream/src/next.rs
new file mode 100644
index 00000000..7b1e0467
--- /dev/null
+++ b/vendor/async-stream/src/next.rs
@@ -0,0 +1,32 @@
+use futures_core::Stream;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+// This is equivalent to the `futures::StreamExt::next` method.
+// But we want to make this crate dependency as small as possible, so we define our `next` function.
+#[doc(hidden)]
+pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
+where
+ S: Stream + Unpin,
+{
+ Next { stream }
+}
+
+#[derive(Debug)]
+struct Next<'a, S> {
+ stream: &'a mut S,
+}
+
+impl<S> Unpin for Next<'_, S> where S: Unpin {}
+
+impl<S> Future for Next<'_, S>
+where
+ S: Stream + Unpin,
+{
+ type Output = Option<S::Item>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.stream).poll_next(cx)
+ }
+}
diff --git a/vendor/async-stream/src/yielder.rs b/vendor/async-stream/src/yielder.rs
new file mode 100644
index 00000000..4b7a9442
--- /dev/null
+++ b/vendor/async-stream/src/yielder.rs
@@ -0,0 +1,94 @@
+use std::cell::Cell;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::ptr;
+use std::task::{Context, Poll};
+
+#[derive(Debug)]
+pub struct Sender<T> {
+ _p: PhantomData<fn(T) -> T>,
+}
+
+#[derive(Debug)]
+pub struct Receiver<T> {
+ _p: PhantomData<T>,
+}
+
+pub(crate) struct Enter<'a, T> {
+ _rx: &'a mut Receiver<T>,
+ prev: *mut (),
+}
+
+// Note: It is considered unsound for anyone other than our macros to call
+// this function. This is a private API intended only for calls from our
+// macros, and users should never call it, but some people tend to
+// misinterpret it as fine to call unless it is marked unsafe.
+#[doc(hidden)]
+pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) {
+ let tx = Sender { _p: PhantomData };
+ let rx = Receiver { _p: PhantomData };
+ (tx, rx)
+}
+
+// Tracks the pointer to `Option<T>`.
+//
+// TODO: Ensure wakers match?
+thread_local!(static STORE: Cell<*mut ()> = const { Cell::new(ptr::null_mut()) });
+
+// ===== impl Sender =====
+
+impl<T> Sender<T> {
+ pub fn send(&mut self, value: T) -> impl Future<Output = ()> {
+ Send { value: Some(value) }
+ }
+}
+
+struct Send<T> {
+ value: Option<T>,
+}
+
+impl<T> Unpin for Send<T> {}
+
+impl<T> Future for Send<T> {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ if self.value.is_none() {
+ return Poll::Ready(());
+ }
+
+ STORE.with(|cell| {
+ let ptr = cell.get() as *mut Option<T>;
+ let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage");
+
+ if option_ref.is_none() {
+ *option_ref = self.value.take();
+ }
+
+ Poll::Pending
+ })
+ }
+}
+
+// ===== impl Receiver =====
+
+impl<T> Receiver<T> {
+ pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> {
+ let prev = STORE.with(|cell| {
+ let prev = cell.get();
+ cell.set(dst as *mut _ as *mut ());
+ prev
+ });
+
+ Enter { _rx: self, prev }
+ }
+}
+
+// ===== impl Enter =====
+
+impl<'a, T> Drop for Enter<'a, T> {
+ fn drop(&mut self) {
+ STORE.with(|cell| cell.set(self.prev));
+ }
+}