diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-02 18:36:06 -0600 |
| commit | 8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch) | |
| tree | 22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/async-stream/src | |
| parent | 4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff) | |
chore: add vendor directory
Diffstat (limited to 'vendor/async-stream/src')
| -rw-r--r-- | vendor/async-stream/src/async_stream.rs | 79 | ||||
| -rw-r--r-- | vendor/async-stream/src/lib.rs | 242 | ||||
| -rw-r--r-- | vendor/async-stream/src/next.rs | 32 | ||||
| -rw-r--r-- | vendor/async-stream/src/yielder.rs | 94 |
4 files changed, 447 insertions, 0 deletions
diff --git a/vendor/async-stream/src/async_stream.rs b/vendor/async-stream/src/async_stream.rs new file mode 100644 index 00000000..ff408ab7 --- /dev/null +++ b/vendor/async-stream/src/async_stream.rs @@ -0,0 +1,79 @@ +use crate::yielder::Receiver; + +use futures_core::{FusedStream, Stream}; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[derive(Debug)] + pub struct AsyncStream<T, U> { + rx: Receiver<T>, + done: bool, + #[pin] + generator: U, + } +} + +impl<T, U> AsyncStream<T, U> { + #[doc(hidden)] + pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> { + AsyncStream { + rx, + done: false, + generator, + } + } +} + +impl<T, U> FusedStream for AsyncStream<T, U> +where + U: Future<Output = ()>, +{ + fn is_terminated(&self) -> bool { + self.done + } +} + +impl<T, U> Stream for AsyncStream<T, U> +where + U: Future<Output = ()>, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let me = self.project(); + + if *me.done { + return Poll::Ready(None); + } + + let mut dst = None; + let res = { + let _enter = me.rx.enter(&mut dst); + me.generator.poll(cx) + }; + + *me.done = res.is_ready(); + + if dst.is_some() { + return Poll::Ready(dst.take()); + } + + if *me.done { + Poll::Ready(None) + } else { + Poll::Pending + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + if self.done { + (0, Some(0)) + } else { + (0, None) + } + } +} diff --git a/vendor/async-stream/src/lib.rs b/vendor/async-stream/src/lib.rs new file mode 100644 index 00000000..318e404e --- /dev/null +++ b/vendor/async-stream/src/lib.rs @@ -0,0 +1,242 @@ +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + unreachable_pub +)] +#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] + +//! Asynchronous stream of elements. +//! +//! Provides two macros, `stream!` and `try_stream!`, allowing the caller to +//! define asynchronous streams of elements. These are implemented using `async` +//! & `await` notation. This crate works without unstable features. +//! +//! The `stream!` macro returns an anonymous type implementing the [`Stream`] +//! trait. The `Item` associated type is the type of the values yielded from the +//! stream. The `try_stream!` also returns an anonymous type implementing the +//! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The +//! `try_stream!` macro supports using `?` notation as part of the +//! implementation. +//! +//! # Usage +//! +//! A basic stream yielding numbers. Values are yielded using the `yield` +//! keyword. The stream block must return `()`. +//! +//! ```rust +//! use async_stream::stream; +//! +//! use futures_util::pin_mut; +//! use futures_util::stream::StreamExt; +//! +//! #[tokio::main] +//! async fn main() { +//! let s = stream! { +//! for i in 0..3 { +//! yield i; +//! } +//! }; +//! +//! pin_mut!(s); // needed for iteration +//! +//! while let Some(value) = s.next().await { +//! println!("got {}", value); +//! } +//! } +//! ``` +//! +//! Streams may be returned by using `impl Stream<Item = T>`: +//! +//! ```rust +//! use async_stream::stream; +//! +//! use futures_core::stream::Stream; +//! use futures_util::pin_mut; +//! use futures_util::stream::StreamExt; +//! +//! fn zero_to_three() -> impl Stream<Item = u32> { +//! stream! { +//! for i in 0..3 { +//! yield i; +//! } +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! let s = zero_to_three(); +//! pin_mut!(s); // needed for iteration +//! +//! while let Some(value) = s.next().await { +//! println!("got {}", value); +//! } +//! } +//! ``` +//! +//! Streams may be implemented in terms of other streams - `async-stream` provides `for await` +//! syntax to assist with this: +//! +//! ```rust +//! use async_stream::stream; +//! +//! use futures_core::stream::Stream; +//! use futures_util::pin_mut; +//! use futures_util::stream::StreamExt; +//! +//! fn zero_to_three() -> impl Stream<Item = u32> { +//! stream! { +//! for i in 0..3 { +//! yield i; +//! } +//! } +//! } +//! +//! fn double<S: Stream<Item = u32>>(input: S) +//! -> impl Stream<Item = u32> +//! { +//! stream! { +//! for await value in input { +//! yield value * 2; +//! } +//! } +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! let s = double(zero_to_three()); +//! pin_mut!(s); // needed for iteration +//! +//! while let Some(value) = s.next().await { +//! println!("got {}", value); +//! } +//! } +//! ``` +//! +//! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` +//! of the returned stream is `Result` with `Ok` being the value yielded and +//! `Err` the error type returned by `?`. +//! +//! ```rust +//! use tokio::net::{TcpListener, TcpStream}; +//! +//! use async_stream::try_stream; +//! use futures_core::stream::Stream; +//! +//! use std::io; +//! use std::net::SocketAddr; +//! +//! fn bind_and_accept(addr: SocketAddr) +//! -> impl Stream<Item = io::Result<TcpStream>> +//! { +//! try_stream! { +//! let mut listener = TcpListener::bind(addr).await?; +//! +//! loop { +//! let (stream, addr) = listener.accept().await?; +//! println!("received on {:?}", addr); +//! yield stream; +//! } +//! } +//! } +//! ``` +//! +//! # Implementation +//! +//! The `stream!` and `try_stream!` macros are implemented using proc macros. +//! The macro searches the syntax tree for instances of `yield $expr` and +//! transforms them into `sender.send($expr).await`. +//! +//! The stream uses a lightweight sender to send values from the stream +//! implementation to the caller. When entering the stream, an `Option<T>` is +//! stored on the stack. A pointer to the cell is stored in a thread local and +//! `poll` is called on the async block. When `poll` returns. +//! `sender.send(value)` stores the value that cell and yields back to the +//! caller. +//! +//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html + +mod async_stream; +mod next; +mod yielder; + +/// Asynchronous stream +/// +/// See [crate](index.html) documentation for more details. +/// +/// # Examples +/// +/// ``` +/// use async_stream::stream; +/// +/// use futures_util::pin_mut; +/// use futures_util::stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() { +/// let s = stream! { +/// for i in 0..3 { +/// yield i; +/// } +/// }; +/// +/// pin_mut!(s); // needed for iteration +/// +/// while let Some(value) = s.next().await { +/// println!("got {}", value); +/// } +/// } +/// ``` +#[macro_export] +macro_rules! stream { + ($($tt:tt)*) => { + $crate::__private::stream_inner!(($crate) $($tt)*) + } +} + +/// Asynchronous fallible stream +/// +/// See [crate](index.html) documentation for more details. +/// +/// # Examples +/// +/// ``` +/// use tokio::net::{TcpListener, TcpStream}; +/// +/// use async_stream::try_stream; +/// use futures_core::stream::Stream; +/// +/// use std::io; +/// use std::net::SocketAddr; +/// +/// fn bind_and_accept(addr: SocketAddr) +/// -> impl Stream<Item = io::Result<TcpStream>> +/// { +/// try_stream! { +/// let mut listener = TcpListener::bind(addr).await?; +/// +/// loop { +/// let (stream, addr) = listener.accept().await?; +/// println!("received on {:?}", addr); +/// yield stream; +/// } +/// } +/// } +/// ``` +#[macro_export] +macro_rules! try_stream { + ($($tt:tt)*) => { + $crate::__private::try_stream_inner!(($crate) $($tt)*) + } +} + +// Not public API. +#[doc(hidden)] +pub mod __private { + pub use crate::async_stream::AsyncStream; + pub use crate::next::next; + pub use async_stream_impl::{stream_inner, try_stream_inner}; + pub mod yielder { + pub use crate::yielder::pair; + } +} diff --git a/vendor/async-stream/src/next.rs b/vendor/async-stream/src/next.rs new file mode 100644 index 00000000..7b1e0467 --- /dev/null +++ b/vendor/async-stream/src/next.rs @@ -0,0 +1,32 @@ +use futures_core::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +// This is equivalent to the `futures::StreamExt::next` method. +// But we want to make this crate dependency as small as possible, so we define our `next` function. +#[doc(hidden)] +pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_ +where + S: Stream + Unpin, +{ + Next { stream } +} + +#[derive(Debug)] +struct Next<'a, S> { + stream: &'a mut S, +} + +impl<S> Unpin for Next<'_, S> where S: Unpin {} + +impl<S> Future for Next<'_, S> +where + S: Stream + Unpin, +{ + type Output = Option<S::Item>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.stream).poll_next(cx) + } +} diff --git a/vendor/async-stream/src/yielder.rs b/vendor/async-stream/src/yielder.rs new file mode 100644 index 00000000..4b7a9442 --- /dev/null +++ b/vendor/async-stream/src/yielder.rs @@ -0,0 +1,94 @@ +use std::cell::Cell; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::ptr; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub struct Sender<T> { + _p: PhantomData<fn(T) -> T>, +} + +#[derive(Debug)] +pub struct Receiver<T> { + _p: PhantomData<T>, +} + +pub(crate) struct Enter<'a, T> { + _rx: &'a mut Receiver<T>, + prev: *mut (), +} + +// Note: It is considered unsound for anyone other than our macros to call +// this function. This is a private API intended only for calls from our +// macros, and users should never call it, but some people tend to +// misinterpret it as fine to call unless it is marked unsafe. +#[doc(hidden)] +pub unsafe fn pair<T>() -> (Sender<T>, Receiver<T>) { + let tx = Sender { _p: PhantomData }; + let rx = Receiver { _p: PhantomData }; + (tx, rx) +} + +// Tracks the pointer to `Option<T>`. +// +// TODO: Ensure wakers match? +thread_local!(static STORE: Cell<*mut ()> = const { Cell::new(ptr::null_mut()) }); + +// ===== impl Sender ===== + +impl<T> Sender<T> { + pub fn send(&mut self, value: T) -> impl Future<Output = ()> { + Send { value: Some(value) } + } +} + +struct Send<T> { + value: Option<T>, +} + +impl<T> Unpin for Send<T> {} + +impl<T> Future for Send<T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + if self.value.is_none() { + return Poll::Ready(()); + } + + STORE.with(|cell| { + let ptr = cell.get() as *mut Option<T>; + let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage"); + + if option_ref.is_none() { + *option_ref = self.value.take(); + } + + Poll::Pending + }) + } +} + +// ===== impl Receiver ===== + +impl<T> Receiver<T> { + pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option<T>) -> Enter<'a, T> { + let prev = STORE.with(|cell| { + let prev = cell.get(); + cell.set(dst as *mut _ as *mut ()); + prev + }); + + Enter { _rx: self, prev } + } +} + +// ===== impl Enter ===== + +impl<'a, T> Drop for Enter<'a, T> { + fn drop(&mut self) { + STORE.with(|cell| cell.set(self.prev)); + } +} |
