summaryrefslogtreecommitdiff
path: root/vendor/tower/tests/buffer
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
committermo khan <mo@mokhan.ca>2025-07-02 18:36:06 -0600
commit8cdfa445d6629ffef4cb84967ff7017654045bc2 (patch)
tree22f0b0907c024c78d26a731e2e1f5219407d8102 /vendor/tower/tests/buffer
parent4351c74c7c5f97156bc94d3a8549b9940ac80e3f (diff)
chore: add vendor directory
Diffstat (limited to 'vendor/tower/tests/buffer')
-rw-r--r--vendor/tower/tests/buffer/main.rs459
1 files changed, 459 insertions, 0 deletions
diff --git a/vendor/tower/tests/buffer/main.rs b/vendor/tower/tests/buffer/main.rs
new file mode 100644
index 00000000..ee238f11
--- /dev/null
+++ b/vendor/tower/tests/buffer/main.rs
@@ -0,0 +1,459 @@
+#![cfg(feature = "buffer")]
+#[path = "../support.rs"]
+mod support;
+use std::thread;
+use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
+use tower::buffer::{error, Buffer};
+use tower::{util::ServiceExt, Service};
+use tower_test::{assert_request_eq, mock};
+
+fn let_worker_work() {
+ // Allow the Buffer's executor to do work
+ thread::sleep(::std::time::Duration::from_millis(100));
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn req_and_res() {
+ let _t = support::trace_init();
+
+ let (mut service, mut handle) = new_service();
+
+ assert_ready_ok!(service.poll_ready());
+ let mut response = task::spawn(service.call("hello"));
+
+ assert_request_eq!(handle, "hello").send_response("world");
+
+ let_worker_work();
+ assert_eq!(assert_ready_ok!(response.poll()), "world");
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn clears_canceled_requests() {
+ let _t = support::trace_init();
+
+ let (mut service, mut handle) = new_service();
+
+ handle.allow(1);
+
+ assert_ready_ok!(service.poll_ready());
+ let mut res1 = task::spawn(service.call("hello"));
+
+ let send_response1 = assert_request_eq!(handle, "hello");
+
+ // don't respond yet, new requests will get buffered
+ assert_ready_ok!(service.poll_ready());
+ let res2 = task::spawn(service.call("hello2"));
+
+ assert_pending!(handle.poll_request());
+
+ assert_ready_ok!(service.poll_ready());
+ let mut res3 = task::spawn(service.call("hello3"));
+
+ drop(res2);
+
+ send_response1.send_response("world");
+
+ let_worker_work();
+ assert_eq!(assert_ready_ok!(res1.poll()), "world");
+
+ // res2 was dropped, so it should have been canceled in the buffer
+ handle.allow(1);
+
+ assert_request_eq!(handle, "hello3").send_response("world3");
+
+ let_worker_work();
+ assert_eq!(assert_ready_ok!(res3.poll()), "world3");
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn when_inner_is_not_ready() {
+ let _t = support::trace_init();
+
+ let (mut service, mut handle) = new_service();
+
+ // Make the service NotReady
+ handle.allow(0);
+
+ assert_ready_ok!(service.poll_ready());
+ let mut res1 = task::spawn(service.call("hello"));
+
+ let_worker_work();
+ assert_pending!(res1.poll());
+ assert_pending!(handle.poll_request());
+
+ handle.allow(1);
+
+ assert_request_eq!(handle, "hello").send_response("world");
+
+ let_worker_work();
+ assert_eq!(assert_ready_ok!(res1.poll()), "world");
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn when_inner_fails() {
+ use std::error::Error as StdError;
+ let _t = support::trace_init();
+
+ let (mut service, mut handle) = new_service();
+
+ // Make the service NotReady
+ handle.allow(0);
+ handle.send_error("foobar");
+
+ assert_ready_ok!(service.poll_ready());
+ let mut res1 = task::spawn(service.call("hello"));
+
+ let_worker_work();
+ let e = assert_ready_err!(res1.poll());
+ if let Some(e) = e.downcast_ref::<error::ServiceError>() {
+ let e = e.source().unwrap();
+
+ assert_eq!(e.to_string(), "foobar");
+ } else {
+ panic!("unexpected error type: {:?}", e);
+ }
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn poll_ready_when_worker_is_dropped_early() {
+ let _t = support::trace_init();
+
+ let (service, _handle) = mock::pair::<(), ()>();
+
+ let (service, worker) = Buffer::pair(service, 1);
+
+ let mut service = mock::Spawn::new(service);
+
+ drop(worker);
+
+ let err = assert_ready_err!(service.poll_ready());
+
+ assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn response_future_when_worker_is_dropped_early() {
+ let _t = support::trace_init();
+
+ let (service, mut handle) = mock::pair::<_, ()>();
+
+ let (service, worker) = Buffer::pair(service, 1);
+
+ let mut service = mock::Spawn::new(service);
+
+ // keep the request in the worker
+ handle.allow(0);
+ assert_ready_ok!(service.poll_ready());
+ let mut response = task::spawn(service.call("hello"));
+
+ drop(worker);
+
+ let_worker_work();
+ let err = assert_ready_err!(response.poll());
+ assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn waits_for_channel_capacity() {
+ let _t = support::trace_init();
+
+ let (service, mut handle) = mock::pair::<&'static str, &'static str>();
+
+ let (service, worker) = Buffer::pair(service, 2);
+
+ let mut service = mock::Spawn::new(service);
+ let mut worker = task::spawn(worker);
+
+ // keep requests in the worker
+ handle.allow(0);
+ assert_ready_ok!(service.poll_ready());
+ let mut response1 = task::spawn(service.call("hello"));
+ assert_pending!(worker.poll());
+
+ assert_ready_ok!(service.poll_ready());
+ let mut response2 = task::spawn(service.call("hello"));
+ assert_pending!(worker.poll());
+
+ assert_ready_ok!(service.poll_ready());
+ let mut response3 = task::spawn(service.call("hello"));
+ assert_pending!(service.poll_ready());
+ assert_pending!(worker.poll());
+
+ handle.allow(1);
+ assert_pending!(worker.poll());
+
+ handle
+ .next_request()
+ .await
+ .unwrap()
+ .1
+ .send_response("world");
+ assert_pending!(worker.poll());
+ assert_ready_ok!(response1.poll());
+
+ assert_ready_ok!(service.poll_ready());
+ let mut response4 = task::spawn(service.call("hello"));
+ assert_pending!(worker.poll());
+
+ handle.allow(3);
+ assert_pending!(worker.poll());
+
+ handle
+ .next_request()
+ .await
+ .unwrap()
+ .1
+ .send_response("world");
+ assert_pending!(worker.poll());
+ assert_ready_ok!(response2.poll());
+
+ assert_pending!(worker.poll());
+ handle
+ .next_request()
+ .await
+ .unwrap()
+ .1
+ .send_response("world");
+ assert_pending!(worker.poll());
+ assert_ready_ok!(response3.poll());
+
+ assert_pending!(worker.poll());
+ handle
+ .next_request()
+ .await
+ .unwrap()
+ .1
+ .send_response("world");
+ assert_pending!(worker.poll());
+ assert_ready_ok!(response4.poll());
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn wakes_pending_waiters_on_close() {
+ let _t = support::trace_init();
+
+ let (service, mut handle) = mock::pair::<_, ()>();
+
+ let (mut service, worker) = Buffer::pair(service, 1);
+ let mut worker = task::spawn(worker);
+
+ // keep the request in the worker
+ handle.allow(0);
+ let service1 = service.ready().await.unwrap();
+ assert_pending!(worker.poll());
+ let mut response = task::spawn(service1.call("hello"));
+
+ assert!(worker.is_woken(), "worker task should be woken by request");
+ assert_pending!(worker.poll());
+
+ // fill the channel so all subsequent requests will wait for capacity
+ let service1 = assert_ready_ok!(task::spawn(service.ready()).poll());
+ assert_pending!(worker.poll());
+ let mut response2 = task::spawn(service1.call("world"));
+
+ let mut service1 = service.clone();
+ let mut ready1 = task::spawn(service1.ready());
+ assert_pending!(worker.poll());
+ assert_pending!(ready1.poll(), "no capacity");
+
+ let mut service1 = service.clone();
+ let mut ready2 = task::spawn(service1.ready());
+ assert_pending!(worker.poll());
+ assert_pending!(ready2.poll(), "no capacity");
+
+ // kill the worker task
+ drop(worker);
+
+ let err = assert_ready_err!(response.poll());
+ assert!(
+ err.is::<error::Closed>(),
+ "response should fail with a Closed, got: {:?}",
+ err
+ );
+
+ let err = assert_ready_err!(response2.poll());
+ assert!(
+ err.is::<error::Closed>(),
+ "response should fail with a Closed, got: {:?}",
+ err
+ );
+
+ assert!(
+ ready1.is_woken(),
+ "dropping worker should wake ready task 1"
+ );
+ let err = assert_ready_err!(ready1.poll());
+ assert!(
+ err.is::<error::Closed>(),
+ "ready 1 should fail with a Closed, got: {:?}",
+ err
+ );
+
+ assert!(
+ ready2.is_woken(),
+ "dropping worker should wake ready task 2"
+ );
+ let err = assert_ready_err!(ready1.poll());
+ assert!(
+ err.is::<error::Closed>(),
+ "ready 2 should fail with a Closed, got: {:?}",
+ err
+ );
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn wakes_pending_waiters_on_failure() {
+ let _t = support::trace_init();
+
+ let (service, mut handle) = mock::pair::<_, ()>();
+
+ let (mut service, worker) = Buffer::pair(service, 1);
+ let mut worker = task::spawn(worker);
+
+ // keep the request in the worker
+ handle.allow(0);
+ let service1 = service.ready().await.unwrap();
+ assert_pending!(worker.poll());
+ let mut response = task::spawn(service1.call("hello"));
+
+ assert!(worker.is_woken(), "worker task should be woken by request");
+ assert_pending!(worker.poll());
+
+ // fill the channel so all subsequent requests will wait for capacity
+ let service1 = assert_ready_ok!(task::spawn(service.ready()).poll());
+ assert_pending!(worker.poll());
+ let mut response2 = task::spawn(service1.call("world"));
+
+ let mut service1 = service.clone();
+ let mut ready1 = task::spawn(service1.ready());
+ assert_pending!(worker.poll());
+ assert_pending!(ready1.poll(), "no capacity");
+
+ let mut service1 = service.clone();
+ let mut ready2 = task::spawn(service1.ready());
+ assert_pending!(worker.poll());
+ assert_pending!(ready2.poll(), "no capacity");
+
+ // fail the inner service
+ handle.send_error("foobar");
+ // worker task terminates
+ assert_ready!(worker.poll());
+
+ let err = assert_ready_err!(response.poll());
+ assert!(
+ err.is::<error::ServiceError>(),
+ "response should fail with a ServiceError, got: {:?}",
+ err
+ );
+ let err = assert_ready_err!(response2.poll());
+ assert!(
+ err.is::<error::ServiceError>(),
+ "response should fail with a ServiceError, got: {:?}",
+ err
+ );
+
+ assert!(
+ ready1.is_woken(),
+ "dropping worker should wake ready task 1"
+ );
+ let err = assert_ready_err!(ready1.poll());
+ assert!(
+ err.is::<error::ServiceError>(),
+ "ready 1 should fail with a ServiceError, got: {:?}",
+ err
+ );
+
+ assert!(
+ ready2.is_woken(),
+ "dropping worker should wake ready task 2"
+ );
+ let err = assert_ready_err!(ready1.poll());
+ assert!(
+ err.is::<error::ServiceError>(),
+ "ready 2 should fail with a ServiceError, got: {:?}",
+ err
+ );
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn propagates_trace_spans() {
+ use tower::util::ServiceExt;
+ use tracing::Instrument;
+
+ let _t = support::trace_init();
+
+ let span = tracing::info_span!("my_span");
+
+ let service = support::AssertSpanSvc::new(span.clone());
+ let (service, worker) = Buffer::pair(service, 5);
+ let worker = tokio::spawn(worker);
+
+ let result = tokio::spawn(service.oneshot(()).instrument(span));
+
+ result.await.expect("service panicked").expect("failed");
+ worker.await.expect("worker panicked");
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn doesnt_leak_permits() {
+ let _t = support::trace_init();
+
+ let (service, mut handle) = mock::pair::<_, ()>();
+
+ let (mut service1, worker) = Buffer::pair(service, 2);
+ let mut worker = task::spawn(worker);
+ let mut service2 = service1.clone();
+ let mut service3 = service1.clone();
+
+ // Attempt to poll the first clone of the buffer to readiness multiple
+ // times. These should all succeed, because the readiness is never
+ // *consumed* --- no request is sent.
+ assert_ready_ok!(task::spawn(service1.ready()).poll());
+ assert_ready_ok!(task::spawn(service1.ready()).poll());
+ assert_ready_ok!(task::spawn(service1.ready()).poll());
+
+ // It should also be possible to drive the second clone of the service to
+ // readiness --- it should only acquire one permit, as well.
+ assert_ready_ok!(task::spawn(service2.ready()).poll());
+ assert_ready_ok!(task::spawn(service2.ready()).poll());
+ assert_ready_ok!(task::spawn(service2.ready()).poll());
+
+ // The third clone *doesn't* poll ready, because the first two clones have
+ // each acquired one permit.
+ let mut ready3 = task::spawn(service3.ready());
+ assert_pending!(ready3.poll());
+
+ // Consume the first service's readiness.
+ let mut response = task::spawn(service1.call(()));
+ handle.allow(1);
+ assert_pending!(worker.poll());
+
+ handle.next_request().await.unwrap().1.send_response(());
+ assert_pending!(worker.poll());
+ assert_ready_ok!(response.poll());
+
+ // Now, the third service should acquire a permit...
+ assert!(ready3.is_woken());
+ assert_ready_ok!(ready3.poll());
+}
+
+type Handle = mock::Handle<&'static str, &'static str>;
+type MockBuffer = Buffer<&'static str, mock::future::ResponseFuture<&'static str>>;
+
+fn new_service() -> (mock::Spawn<MockBuffer>, Handle) {
+ // bound is >0 here because clears_canceled_requests needs multiple outstanding requests
+ new_service_with_bound(10)
+}
+
+fn new_service_with_bound(bound: usize) -> (mock::Spawn<MockBuffer>, Handle) {
+ mock::spawn_with(|s| {
+ let (svc, worker) = Buffer::pair(s, bound);
+
+ thread::spawn(move || {
+ let mut fut = tokio_test::task::spawn(worker);
+ while fut.poll().is_pending() {}
+ });
+
+ svc
+ })
+}