summaryrefslogtreecommitdiff
path: root/vendor/async-stream/tests/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/async-stream/tests/stream.rs')
-rw-r--r--vendor/async-stream/tests/stream.rs237
1 files changed, 237 insertions, 0 deletions
diff --git a/vendor/async-stream/tests/stream.rs b/vendor/async-stream/tests/stream.rs
new file mode 100644
index 00000000..4e26a3d1
--- /dev/null
+++ b/vendor/async-stream/tests/stream.rs
@@ -0,0 +1,237 @@
+use async_stream::stream;
+
+use futures_core::stream::{FusedStream, Stream};
+use futures_util::pin_mut;
+use futures_util::stream::StreamExt;
+use tokio::sync::mpsc;
+use tokio_test::assert_ok;
+
+#[tokio::test]
+async fn noop_stream() {
+ let s = stream! {};
+ pin_mut!(s);
+
+ while s.next().await.is_some() {
+ unreachable!();
+ }
+}
+
+#[tokio::test]
+async fn empty_stream() {
+ let mut ran = false;
+
+ {
+ let r = &mut ran;
+ let s = stream! {
+ *r = true;
+ println!("hello world!");
+ };
+ pin_mut!(s);
+
+ while s.next().await.is_some() {
+ unreachable!();
+ }
+ }
+
+ assert!(ran);
+}
+
+#[tokio::test]
+async fn yield_single_value() {
+ let s = stream! {
+ yield "hello";
+ };
+
+ let values: Vec<_> = s.collect().await;
+
+ assert_eq!(1, values.len());
+ assert_eq!("hello", values[0]);
+}
+
+#[tokio::test]
+async fn fused() {
+ let s = stream! {
+ yield "hello";
+ };
+ pin_mut!(s);
+
+ assert!(!s.is_terminated());
+ assert_eq!(s.next().await, Some("hello"));
+ assert_eq!(s.next().await, None);
+
+ assert!(s.is_terminated());
+ // This should return None from now on
+ assert_eq!(s.next().await, None);
+}
+
+#[tokio::test]
+async fn yield_multi_value() {
+ let s = stream! {
+ yield "hello";
+ yield "world";
+ yield "dizzy";
+ };
+
+ let values: Vec<_> = s.collect().await;
+
+ assert_eq!(3, values.len());
+ assert_eq!("hello", values[0]);
+ assert_eq!("world", values[1]);
+ assert_eq!("dizzy", values[2]);
+}
+
+#[tokio::test]
+async fn unit_yield_in_select() {
+ use tokio::select;
+
+ async fn do_stuff_async() {}
+
+ let s = stream! {
+ select! {
+ _ = do_stuff_async() => yield,
+ else => yield,
+ }
+ };
+
+ let values: Vec<_> = s.collect().await;
+ assert_eq!(values.len(), 1);
+}
+
+#[tokio::test]
+async fn yield_with_select() {
+ use tokio::select;
+
+ async fn do_stuff_async() {}
+ async fn more_async_work() {}
+
+ let s = stream! {
+ select! {
+ _ = do_stuff_async() => yield "hey",
+ _ = more_async_work() => yield "hey",
+ else => yield "hey",
+ }
+ };
+
+ let values: Vec<_> = s.collect().await;
+ assert_eq!(values, vec!["hey"]);
+}
+
+#[tokio::test]
+async fn return_stream() {
+ fn build_stream() -> impl Stream<Item = u32> {
+ stream! {
+ yield 1;
+ yield 2;
+ yield 3;
+ }
+ }
+
+ let s = build_stream();
+
+ let values: Vec<_> = s.collect().await;
+ assert_eq!(3, values.len());
+ assert_eq!(1, values[0]);
+ assert_eq!(2, values[1]);
+ assert_eq!(3, values[2]);
+}
+
+#[tokio::test]
+async fn consume_channel() {
+ let (tx, mut rx) = mpsc::channel(10);
+
+ let s = stream! {
+ while let Some(v) = rx.recv().await {
+ yield v;
+ }
+ };
+
+ pin_mut!(s);
+
+ for i in 0..3 {
+ assert_ok!(tx.send(i).await);
+ assert_eq!(Some(i), s.next().await);
+ }
+
+ drop(tx);
+ assert_eq!(None, s.next().await);
+}
+
+#[tokio::test]
+async fn borrow_self() {
+ struct Data(String);
+
+ impl Data {
+ fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
+ stream! {
+ yield &self.0[..];
+ }
+ }
+ }
+
+ let data = Data("hello".to_string());
+ let s = data.stream();
+ pin_mut!(s);
+
+ assert_eq!(Some("hello"), s.next().await);
+}
+
+#[tokio::test]
+async fn stream_in_stream() {
+ let s = stream! {
+ let s = stream! {
+ for i in 0..3 {
+ yield i;
+ }
+ };
+
+ pin_mut!(s);
+ while let Some(v) = s.next().await {
+ yield v;
+ }
+ };
+
+ let values: Vec<_> = s.collect().await;
+ assert_eq!(3, values.len());
+}
+
+#[tokio::test]
+async fn yield_non_unpin_value() {
+ let s: Vec<_> = stream! {
+ for i in 0..3 {
+ yield async move { i };
+ }
+ }
+ .buffered(1)
+ .collect()
+ .await;
+
+ assert_eq!(s, vec![0, 1, 2]);
+}
+
+#[test]
+fn inner_try_stream() {
+ use async_stream::try_stream;
+ use tokio::select;
+
+ async fn do_stuff_async() {}
+
+ let _ = stream! {
+ select! {
+ _ = do_stuff_async() => {
+ let another_s = try_stream! {
+ yield;
+ };
+ let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap();
+ },
+ else => {},
+ }
+ yield
+ };
+}
+
+#[rustversion::attr(not(stable), ignore)]
+#[test]
+fn test() {
+ let t = trybuild::TestCases::new();
+ t.compile_fail("tests/ui/*.rs");
+}