diff options
Diffstat (limited to 'vendor/async-stream/tests/stream.rs')
| -rw-r--r-- | vendor/async-stream/tests/stream.rs | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/vendor/async-stream/tests/stream.rs b/vendor/async-stream/tests/stream.rs new file mode 100644 index 00000000..4e26a3d1 --- /dev/null +++ b/vendor/async-stream/tests/stream.rs @@ -0,0 +1,237 @@ +use async_stream::stream; + +use futures_core::stream::{FusedStream, Stream}; +use futures_util::pin_mut; +use futures_util::stream::StreamExt; +use tokio::sync::mpsc; +use tokio_test::assert_ok; + +#[tokio::test] +async fn noop_stream() { + let s = stream! {}; + pin_mut!(s); + + while s.next().await.is_some() { + unreachable!(); + } +} + +#[tokio::test] +async fn empty_stream() { + let mut ran = false; + + { + let r = &mut ran; + let s = stream! { + *r = true; + println!("hello world!"); + }; + pin_mut!(s); + + while s.next().await.is_some() { + unreachable!(); + } + } + + assert!(ran); +} + +#[tokio::test] +async fn yield_single_value() { + let s = stream! { + yield "hello"; + }; + + let values: Vec<_> = s.collect().await; + + assert_eq!(1, values.len()); + assert_eq!("hello", values[0]); +} + +#[tokio::test] +async fn fused() { + let s = stream! { + yield "hello"; + }; + pin_mut!(s); + + assert!(!s.is_terminated()); + assert_eq!(s.next().await, Some("hello")); + assert_eq!(s.next().await, None); + + assert!(s.is_terminated()); + // This should return None from now on + assert_eq!(s.next().await, None); +} + +#[tokio::test] +async fn yield_multi_value() { + let s = stream! { + yield "hello"; + yield "world"; + yield "dizzy"; + }; + + let values: Vec<_> = s.collect().await; + + assert_eq!(3, values.len()); + assert_eq!("hello", values[0]); + assert_eq!("world", values[1]); + assert_eq!("dizzy", values[2]); +} + +#[tokio::test] +async fn unit_yield_in_select() { + use tokio::select; + + async fn do_stuff_async() {} + + let s = stream! { + select! { + _ = do_stuff_async() => yield, + else => yield, + } + }; + + let values: Vec<_> = s.collect().await; + assert_eq!(values.len(), 1); +} + +#[tokio::test] +async fn yield_with_select() { + use tokio::select; + + async fn do_stuff_async() {} + async fn more_async_work() {} + + let s = stream! { + select! { + _ = do_stuff_async() => yield "hey", + _ = more_async_work() => yield "hey", + else => yield "hey", + } + }; + + let values: Vec<_> = s.collect().await; + assert_eq!(values, vec!["hey"]); +} + +#[tokio::test] +async fn return_stream() { + fn build_stream() -> impl Stream<Item = u32> { + stream! { + yield 1; + yield 2; + yield 3; + } + } + + let s = build_stream(); + + let values: Vec<_> = s.collect().await; + assert_eq!(3, values.len()); + assert_eq!(1, values[0]); + assert_eq!(2, values[1]); + assert_eq!(3, values[2]); +} + +#[tokio::test] +async fn consume_channel() { + let (tx, mut rx) = mpsc::channel(10); + + let s = stream! { + while let Some(v) = rx.recv().await { + yield v; + } + }; + + pin_mut!(s); + + for i in 0..3 { + assert_ok!(tx.send(i).await); + assert_eq!(Some(i), s.next().await); + } + + drop(tx); + assert_eq!(None, s.next().await); +} + +#[tokio::test] +async fn borrow_self() { + struct Data(String); + + impl Data { + fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a { + stream! { + yield &self.0[..]; + } + } + } + + let data = Data("hello".to_string()); + let s = data.stream(); + pin_mut!(s); + + assert_eq!(Some("hello"), s.next().await); +} + +#[tokio::test] +async fn stream_in_stream() { + let s = stream! { + let s = stream! { + for i in 0..3 { + yield i; + } + }; + + pin_mut!(s); + while let Some(v) = s.next().await { + yield v; + } + }; + + let values: Vec<_> = s.collect().await; + assert_eq!(3, values.len()); +} + +#[tokio::test] +async fn yield_non_unpin_value() { + let s: Vec<_> = stream! { + for i in 0..3 { + yield async move { i }; + } + } + .buffered(1) + .collect() + .await; + + assert_eq!(s, vec![0, 1, 2]); +} + +#[test] +fn inner_try_stream() { + use async_stream::try_stream; + use tokio::select; + + async fn do_stuff_async() {} + + let _ = stream! { + select! { + _ = do_stuff_async() => { + let another_s = try_stream! { + yield; + }; + let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap(); + }, + else => {}, + } + yield + }; +} + +#[rustversion::attr(not(stable), ignore)] +#[test] +fn test() { + let t = trybuild::TestCases::new(); + t.compile_fail("tests/ui/*.rs"); +} |
