summaryrefslogtreecommitdiff
path: root/vendor/hyper-util/tests
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-15 16:37:08 -0600
committermo khan <mo@mokhan.ca>2025-07-17 16:30:22 -0600
commit45df4d0d9b577fecee798d672695fe24ff57fb1b (patch)
tree1b99bf645035b58e0d6db08c7a83521f41f7a75b /vendor/hyper-util/tests
parentf94f79608393d4ab127db63cc41668445ef6b243 (diff)
feat: migrate from Cedar to SpiceDB authorization system
This is a major architectural change that replaces the Cedar policy-based authorization system with SpiceDB's relation-based authorization. Key changes: - Migrate from Rust to Go implementation - Replace Cedar policies with SpiceDB schema and relationships - Switch from envoy `ext_authz` with Cedar to SpiceDB permission checks - Update build system and dependencies for Go ecosystem - Maintain Envoy integration for external authorization This change enables more flexible permission modeling through SpiceDB's Google Zanzibar inspired relation-based system, supporting complex hierarchical permissions that were difficult to express in Cedar. Breaking change: Existing Cedar policies and Rust-based configuration will no longer work and need to be migrated to SpiceDB schema.
Diffstat (limited to 'vendor/hyper-util/tests')
-rw-r--r--vendor/hyper-util/tests/legacy_client.rs1488
-rw-r--r--vendor/hyper-util/tests/proxy.rs478
-rw-r--r--vendor/hyper-util/tests/test_utils/mod.rs175
3 files changed, 0 insertions, 2141 deletions
diff --git a/vendor/hyper-util/tests/legacy_client.rs b/vendor/hyper-util/tests/legacy_client.rs
deleted file mode 100644
index bdac5e0e..00000000
--- a/vendor/hyper-util/tests/legacy_client.rs
+++ /dev/null
@@ -1,1488 +0,0 @@
-mod test_utils;
-
-use std::io::{Read, Write};
-use std::net::{SocketAddr, TcpListener};
-use std::pin::Pin;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
-use std::task::Poll;
-use std::thread;
-use std::time::Duration;
-
-use futures_channel::{mpsc, oneshot};
-use futures_util::future::{self, FutureExt, TryFutureExt};
-use futures_util::stream::StreamExt;
-use futures_util::{self, Stream};
-use http_body_util::BodyExt;
-use http_body_util::{Empty, Full, StreamBody};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-
-use hyper::body::Bytes;
-use hyper::body::Frame;
-use hyper::Request;
-use hyper_util::client::legacy::connect::{capture_connection, HttpConnector};
-use hyper_util::client::legacy::Client;
-use hyper_util::rt::{TokioExecutor, TokioIo};
-
-use test_utils::{DebugConnector, DebugStream};
-
-pub fn runtime() -> tokio::runtime::Runtime {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("new rt")
-}
-
-fn s(buf: &[u8]) -> &str {
- std::str::from_utf8(buf).expect("from_utf8")
-}
-
-#[cfg(not(miri))]
-#[test]
-fn drop_body_before_eof_closes_connection() {
- // https://github.com/hyperium/hyper/issues/1353
- let _ = pretty_env_logger::try_init();
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let rt = runtime();
- let (closes_tx, closes) = mpsc::channel::<()>(10);
- let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(
- DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx),
- );
- let (tx1, rx1) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- let body = vec![b'x'; 1024 * 128];
- write!(
- sock,
- "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
- body.len()
- )
- .expect("write head");
- let _ = sock.write_all(&body);
- let _ = tx1.send(());
- });
-
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req).map_ok(move |res| {
- assert_eq!(res.status(), hyper::StatusCode::OK);
- });
- let rx = rx1;
- rt.block_on(async move {
- let (res, _) = future::join(res, rx).await;
- res.unwrap();
- tokio::time::sleep(Duration::from_secs(1)).await;
- });
- rt.block_on(closes.into_future()).0.expect("closes");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn drop_client_closes_idle_connections() {
- let _ = pretty_env_logger::try_init();
-
- let server = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
- let addr = server.local_addr().unwrap();
- let (closes_tx, mut closes) = mpsc::channel(10);
-
- let (tx1, rx1) = oneshot::channel();
-
- let t1 = tokio::spawn(async move {
- let mut sock = server.accept().await.unwrap().0;
- let mut buf = [0; 4096];
- sock.read(&mut buf).await.expect("read 1");
- let body = [b'x'; 64];
- let headers = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len());
- sock.write_all(headers.as_bytes())
- .await
- .expect("write head");
- sock.write_all(&body).await.expect("write body");
- let _ = tx1.send(());
-
- // prevent this thread from closing until end of test, so the connection
- // stays open and idle until Client is dropped
- if let Ok(n) = sock.read(&mut buf).await {
- assert_eq!(n, 0);
- }
- });
-
- let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes(
- HttpConnector::new(),
- closes_tx,
- ));
-
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req).map_ok(move |res| {
- assert_eq!(res.status(), hyper::StatusCode::OK);
- });
- let rx = rx1;
- let (res, _) = future::join(res, rx).await;
- res.unwrap();
-
- // not closed yet, just idle
- future::poll_fn(|ctx| {
- assert!(Pin::new(&mut closes).poll_next(ctx).is_pending());
- Poll::Ready(())
- })
- .await;
-
- // drop to start the connections closing
- drop(client);
-
- // and wait a few ticks for the connections to close
- let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
- futures_util::pin_mut!(t);
- let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
- future::select(t, close).await;
- t1.await.unwrap();
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn drop_response_future_closes_in_progress_connection() {
- let _ = pretty_env_logger::try_init();
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let (closes_tx, closes) = mpsc::channel(10);
-
- let (tx1, rx1) = oneshot::channel();
- let (_client_drop_tx, client_drop_rx) = std::sync::mpsc::channel::<()>();
-
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- // we never write a response head
- // simulates a slow server operation
- let _ = tx1.send(());
-
- // prevent this thread from closing until end of test, so the connection
- // stays open and idle until Client is dropped
- let _ = client_drop_rx.recv();
- });
-
- let res = {
- let client = Client::builder(TokioExecutor::new()).build(
- DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx),
- );
-
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- client.request(req).map(|_| unreachable!())
- };
-
- future::select(res, rx1).await;
-
- // res now dropped
- let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
- futures_util::pin_mut!(t);
- let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
- future::select(t, close).await;
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn drop_response_body_closes_in_progress_connection() {
- let _ = pretty_env_logger::try_init();
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let (closes_tx, closes) = mpsc::channel(10);
-
- let (tx1, rx1) = oneshot::channel();
- let (_client_drop_tx, client_drop_rx) = std::sync::mpsc::channel::<()>();
-
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- write!(
- sock,
- "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n"
- )
- .expect("write head");
- let _ = tx1.send(());
-
- // prevent this thread from closing until end of test, so the connection
- // stays open and idle until Client is dropped
- let _ = client_drop_rx.recv();
- });
-
- let rx = rx1;
- let res = {
- let client = Client::builder(TokioExecutor::new()).build(
- DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx),
- );
-
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- // notably, haven't read body yet
- client.request(req)
- };
-
- let (res, _) = future::join(res, rx).await;
- // drop the body
- res.unwrap();
-
- // and wait a few ticks to see the connection drop
- let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
- futures_util::pin_mut!(t);
- let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
- future::select(t, close).await;
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn no_keep_alive_closes_connection() {
- // https://github.com/hyperium/hyper/issues/1383
- let _ = pretty_env_logger::try_init();
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let (closes_tx, closes) = mpsc::channel(10);
-
- let (tx1, rx1) = oneshot::channel();
- let (_tx2, rx2) = std::sync::mpsc::channel::<()>();
-
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .unwrap();
- let _ = tx1.send(());
-
- // prevent this thread from closing until end of test, so the connection
- // stays open and idle until Client is dropped
- let _ = rx2.recv();
- });
-
- let client = Client::builder(TokioExecutor::new())
- .pool_max_idle_per_host(0)
- .build(DebugConnector::with_http_and_closes(
- HttpConnector::new(),
- closes_tx,
- ));
-
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req).map_ok(move |res| {
- assert_eq!(res.status(), hyper::StatusCode::OK);
- });
- let rx = rx1;
- let (res, _) = future::join(res, rx).await;
- res.unwrap();
-
- let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
- futures_util::pin_mut!(t);
- let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
- future::select(close, t).await;
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn socket_disconnect_closes_idle_conn() {
- // notably when keep-alive is enabled
- let _ = pretty_env_logger::try_init();
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let (closes_tx, closes) = mpsc::channel(10);
-
- let (tx1, rx1) = oneshot::channel();
-
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .unwrap();
- let _ = tx1.send(());
- });
-
- let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes(
- HttpConnector::new(),
- closes_tx,
- ));
-
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req).map_ok(move |res| {
- assert_eq!(res.status(), hyper::StatusCode::OK);
- });
- let rx = rx1;
-
- let (res, _) = future::join(res, rx).await;
- res.unwrap();
-
- let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
- futures_util::pin_mut!(t);
- let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
- future::select(t, close).await;
-}
-
-#[cfg(not(miri))]
-#[test]
-fn connect_call_is_lazy() {
- // We especially don't want connects() triggered if there's
- // idle connections that the Checkout would have found
- let _ = pretty_env_logger::try_init();
-
- let _rt = runtime();
- let connector = DebugConnector::new();
- let connects = connector.connects.clone();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- assert_eq!(connects.load(Ordering::Relaxed), 0);
- let req = Request::builder()
- .uri("http://hyper.local/a")
- .body(Empty::<Bytes>::new())
- .unwrap();
- let _fut = client.request(req);
- // internal Connect::connect should have been lazy, and not
- // triggered an actual connect yet.
- assert_eq!(connects.load(Ordering::Relaxed), 0);
-}
-
-#[cfg(not(miri))]
-#[test]
-fn client_keep_alive_0() {
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let rt = runtime();
- let connector = DebugConnector::new();
- let connects = connector.connects.clone();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- //drop(server);
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 1");
- let _ = tx1.send(());
-
- let n2 = sock.read(&mut buf).expect("read 2");
- assert_ne!(n2, 0);
- let second_get = "GET /b HTTP/1.1\r\n";
- assert_eq!(s(&buf[..second_get.len()]), second_get);
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 2");
- let _ = tx2.send(());
- });
-
- assert_eq!(connects.load(Ordering::SeqCst), 0);
-
- let rx = rx1;
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req);
- rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-
- assert_eq!(connects.load(Ordering::SeqCst), 1);
-
- // sleep real quick to let the threadpool put connection in ready
- // state and back into client pool
- thread::sleep(Duration::from_millis(50));
-
- let rx = rx2;
- let req = Request::builder()
- .uri(&*format!("http://{addr}/b"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req);
- rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-
- assert_eq!(
- connects.load(Ordering::SeqCst),
- 1,
- "second request should still only have 1 connect"
- );
- drop(client);
-}
-
-#[cfg(not(miri))]
-#[test]
-fn client_keep_alive_extra_body() {
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let rt = runtime();
-
- let connector = DebugConnector::new();
- let connects = connector.connects.clone();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello")
- .expect("write 1");
- // the body "hello", while ignored because its a HEAD request, should mean the connection
- // cannot be put back in the pool
- let _ = tx1.send(());
-
- let mut sock2 = server.accept().unwrap().0;
- let n2 = sock2.read(&mut buf).expect("read 2");
- assert_ne!(n2, 0);
- let second_get = "GET /b HTTP/1.1\r\n";
- assert_eq!(s(&buf[..second_get.len()]), second_get);
- sock2
- .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 2");
- let _ = tx2.send(());
- });
-
- assert_eq!(connects.load(Ordering::Relaxed), 0);
-
- let rx = rx1;
- let req = Request::builder()
- .method("HEAD")
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req);
- rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-
- assert_eq!(connects.load(Ordering::Relaxed), 1);
-
- let rx = rx2;
- let req = Request::builder()
- .uri(&*format!("http://{addr}/b"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req);
- rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-
- assert_eq!(connects.load(Ordering::Relaxed), 2);
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn client_keep_alive_when_response_before_request_body_ends() {
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
-
- let (closes_tx, mut closes) = mpsc::channel::<()>(10);
- let connector = DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx);
- let connects = connector.connects.clone();
- let client = Client::builder(TokioExecutor::new()).build(connector.clone());
-
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = oneshot::channel();
- let (_tx3, rx3) = std::sync::mpsc::channel::<()>();
-
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 1");
- // after writing the response, THEN stream the body
- let _ = tx1.send(());
-
- sock.read(&mut buf).expect("read 2");
- let _ = tx2.send(());
-
- // prevent this thread from closing until end of test, so the connection
- // stays open and idle until Client is dropped
- let _ = rx3.recv();
- });
-
- assert_eq!(connects.load(Ordering::Relaxed), 0);
-
- let delayed_body = rx1
- .then(|_| Box::pin(tokio::time::sleep(Duration::from_millis(200))))
- .map(|_| Ok::<_, ()>(Frame::data(&b"hello a"[..])))
- .map_err(|_| -> hyper::Error { panic!("rx1") })
- .into_stream();
-
- let req = Request::builder()
- .method("POST")
- .uri(&*format!("http://{addr}/a"))
- .body(StreamBody::new(delayed_body))
- .unwrap();
- let res = client.request(req).map_ok(move |res| {
- assert_eq!(res.status(), hyper::StatusCode::OK);
- });
-
- future::join(res, rx2).await.0.unwrap();
- future::poll_fn(|ctx| {
- assert!(Pin::new(&mut closes).poll_next(ctx).is_pending());
- Poll::Ready(())
- })
- .await;
-
- assert_eq!(connects.load(Ordering::Relaxed), 1);
-
- drop(client);
- let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
- futures_util::pin_mut!(t);
- let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
- future::select(t, close).await;
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn client_keep_alive_eager_when_chunked() {
- // If a response body has been read to completion, with completion
- // determined by some other factor, like decompression, and thus
- // it is in't polled a final time to clear the final 0-len chunk,
- // try to eagerly clear it so the connection can still be used.
-
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let connector = DebugConnector::new();
- let connects = connector.connects.clone();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let (tx1, rx1) = oneshot::channel();
- let (tx2, rx2) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- //drop(server);
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(
- b"\
- HTTP/1.1 200 OK\r\n\
- transfer-encoding: chunked\r\n\
- \r\n\
- 5\r\n\
- hello\r\n\
- 0\r\n\r\n\
- ",
- )
- .expect("write 1");
- let _ = tx1.send(());
-
- let n2 = sock.read(&mut buf).expect("read 2");
- assert_ne!(n2, 0, "bytes of second request");
- let second_get = "GET /b HTTP/1.1\r\n";
- assert_eq!(s(&buf[..second_get.len()]), second_get);
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 2");
- let _ = tx2.send(());
- });
-
- assert_eq!(connects.load(Ordering::SeqCst), 0);
-
- let rx = rx1;
- let req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let fut = client.request(req);
-
- let resp = future::join(fut, rx).map(|r| r.0).await.unwrap();
- assert_eq!(connects.load(Ordering::SeqCst), 1);
- assert_eq!(resp.status(), 200);
- assert_eq!(resp.headers()["transfer-encoding"], "chunked");
-
- // Read the "hello" chunk...
- let chunk = resp.collect().await.unwrap().to_bytes();
- assert_eq!(chunk, "hello");
-
- // sleep real quick to let the threadpool put connection in ready
- // state and back into client pool
- tokio::time::sleep(Duration::from_millis(50)).await;
-
- let rx = rx2;
- let req = Request::builder()
- .uri(&*format!("http://{addr}/b"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let fut = client.request(req);
- future::join(fut, rx).map(|r| r.0).await.unwrap();
-
- assert_eq!(
- connects.load(Ordering::SeqCst),
- 1,
- "second request should still only have 1 connect"
- );
- drop(client);
-}
-
-#[cfg(not(miri))]
-#[test]
-fn connect_proxy_sends_absolute_uri() {
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let rt = runtime();
- let connector = DebugConnector::new().proxy();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let (tx1, rx1) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- //drop(server);
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- let n = sock.read(&mut buf).expect("read 1");
- let expected = format!("GET http://{addr}/foo/bar HTTP/1.1\r\nhost: {addr}\r\n\r\n");
- assert_eq!(s(&buf[..n]), expected);
-
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 1");
- let _ = tx1.send(());
- });
-
- let rx = rx1;
- let req = Request::builder()
- .uri(&*format!("http://{addr}/foo/bar"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req);
- rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-}
-
-#[cfg(not(miri))]
-#[test]
-fn connect_proxy_http_connect_sends_authority_form() {
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let rt = runtime();
- let connector = DebugConnector::new().proxy();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let (tx1, rx1) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- //drop(server);
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- let n = sock.read(&mut buf).expect("read 1");
- let expected = format!("CONNECT {addr} HTTP/1.1\r\nhost: {addr}\r\n\r\n");
- assert_eq!(s(&buf[..n]), expected);
-
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 1");
- let _ = tx1.send(());
- });
-
- let rx = rx1;
- let req = Request::builder()
- .method("CONNECT")
- .uri(&*format!("http://{addr}/useless/path"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let res = client.request(req);
- rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-}
-
-#[cfg(not(miri))]
-#[test]
-fn client_upgrade() {
- use tokio::io::{AsyncReadExt, AsyncWriteExt};
-
- let _ = pretty_env_logger::try_init();
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let rt = runtime();
-
- let connector = DebugConnector::new();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let (tx1, rx1) = oneshot::channel();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(
- b"\
- HTTP/1.1 101 Switching Protocols\r\n\
- Upgrade: foobar\r\n\
- \r\n\
- foobar=ready\
- ",
- )
- .unwrap();
- let _ = tx1.send(());
-
- let n = sock.read(&mut buf).expect("read 2");
- assert_eq!(&buf[..n], b"foo=bar");
- sock.write_all(b"bar=foo").expect("write 2");
- });
-
- let rx = rx1;
-
- let req = Request::builder()
- .method("GET")
- .uri(&*format!("http://{addr}/up"))
- .body(Empty::<Bytes>::new())
- .unwrap();
-
- let res = client.request(req);
- let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
-
- assert_eq!(res.status(), 101);
- let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade");
-
- let parts = upgraded.downcast::<DebugStream>().unwrap();
- assert_eq!(s(&parts.read_buf), "foobar=ready");
-
- let mut io = parts.io;
- rt.block_on(io.write_all(b"foo=bar")).unwrap();
- let mut vec = vec![];
- rt.block_on(io.read_to_end(&mut vec)).unwrap();
- assert_eq!(vec, b"bar=foo");
-}
-
-#[cfg(not(miri))]
-#[test]
-fn client_http2_upgrade() {
- use http::{Method, Response, Version};
- use hyper::service::service_fn;
- use tokio::io::{AsyncReadExt, AsyncWriteExt};
- use tokio::net::TcpListener;
-
- let _ = pretty_env_logger::try_init();
- let rt = runtime();
- let server = rt
- .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))))
- .unwrap();
- let addr = server.local_addr().unwrap();
- let mut connector = DebugConnector::new();
- connector.alpn_h2 = true;
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- rt.spawn(async move {
- let (stream, _) = server.accept().await.expect("accept");
- let stream = TokioIo::new(stream);
- let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
- // IMPORTANT: This is required to advertise our support for HTTP/2 websockets to the client.
- builder.http2().enable_connect_protocol();
- builder
- .serve_connection_with_upgrades(
- stream,
- service_fn(|req| async move {
- assert_eq!(req.headers().get("host"), None);
- assert_eq!(req.version(), Version::HTTP_2);
- assert_eq!(
- req.headers().get(http::header::SEC_WEBSOCKET_VERSION),
- Some(&http::header::HeaderValue::from_static("13"))
- );
- assert_eq!(
- req.extensions().get::<hyper::ext::Protocol>(),
- Some(&hyper::ext::Protocol::from_static("websocket"))
- );
-
- let on_upgrade = hyper::upgrade::on(req);
- tokio::spawn(async move {
- let upgraded = on_upgrade.await.unwrap();
- let mut io = TokioIo::new(upgraded);
-
- let mut vec = vec![];
- io.read_buf(&mut vec).await.unwrap();
- assert_eq!(vec, b"foo=bar");
- io.write_all(b"bar=foo").await.unwrap();
- });
-
- Ok::<_, hyper::Error>(Response::new(Empty::<Bytes>::new()))
- }),
- )
- .await
- .expect("server");
- });
-
- let req = Request::builder()
- .method(Method::CONNECT)
- .uri(&*format!("http://{addr}/up"))
- .header(http::header::SEC_WEBSOCKET_VERSION, "13")
- .version(Version::HTTP_2)
- .extension(hyper::ext::Protocol::from_static("websocket"))
- .body(Empty::<Bytes>::new())
- .unwrap();
-
- let res = client.request(req);
- let res = rt.block_on(res).unwrap();
-
- assert_eq!(res.status(), http::StatusCode::OK);
- assert_eq!(res.version(), Version::HTTP_2);
-
- let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade");
- let mut io = TokioIo::new(upgraded);
-
- rt.block_on(io.write_all(b"foo=bar")).unwrap();
- let mut vec = vec![];
- rt.block_on(io.read_to_end(&mut vec)).unwrap();
- assert_eq!(vec, b"bar=foo");
-}
-
-#[cfg(not(miri))]
-#[test]
-fn alpn_h2() {
- use http::Response;
- use hyper::service::service_fn;
- use tokio::net::TcpListener;
-
- let _ = pretty_env_logger::try_init();
- let rt = runtime();
- let listener = rt
- .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))))
- .unwrap();
- let addr = listener.local_addr().unwrap();
- let mut connector = DebugConnector::new();
- connector.alpn_h2 = true;
- let connects = connector.connects.clone();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- rt.spawn(async move {
- let (stream, _) = listener.accept().await.expect("accept");
- let stream = TokioIo::new(stream);
- hyper::server::conn::http2::Builder::new(TokioExecutor::new())
- .serve_connection(
- stream,
- service_fn(|req| async move {
- assert_eq!(req.headers().get("host"), None);
- Ok::<_, hyper::Error>(Response::new(Full::<Bytes>::from("Hello, world")))
- }),
- )
- .await
- .expect("server");
- });
-
- assert_eq!(connects.load(Ordering::SeqCst), 0);
-
- let url = format!("http://{addr}/a").parse::<::hyper::Uri>().unwrap();
- let res1 = client.get(url.clone());
- let res2 = client.get(url.clone());
- let res3 = client.get(url.clone());
- rt.block_on(future::try_join3(res1, res2, res3)).unwrap();
-
- // Since the client doesn't know it can ALPN at first, it will have
- // started 3 connections. But, the server above will only handle 1,
- // so the unwrapped responses futures show it still worked.
- assert_eq!(connects.load(Ordering::SeqCst), 3);
-
- let res4 = client.get(url.clone());
- rt.block_on(res4).unwrap();
-
- // HTTP/2 request allowed
- let res5 = client.request(
- Request::builder()
- .uri(url)
- .version(hyper::Version::HTTP_2)
- .body(Empty::<Bytes>::new())
- .unwrap(),
- );
- rt.block_on(res5).unwrap();
-
- assert_eq!(
- connects.load(Ordering::SeqCst),
- 3,
- "after ALPN, no more connects"
- );
- drop(client);
-}
-
-#[cfg(not(miri))]
-#[test]
-fn capture_connection_on_client() {
- let _ = pretty_env_logger::try_init();
-
- let rt = runtime();
- let connector = DebugConnector::new();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- thread::spawn(move || {
- let mut sock = server.accept().unwrap().0;
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- sock.read(&mut buf).expect("read 1");
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 1");
- });
- let mut req = Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap();
- let captured_conn = capture_connection(&mut req);
- rt.block_on(client.request(req)).expect("200 OK");
- assert!(captured_conn.connection_metadata().is_some());
-}
-
-#[cfg(not(miri))]
-#[test]
-fn connection_poisoning() {
- use std::sync::atomic::AtomicUsize;
-
- let _ = pretty_env_logger::try_init();
-
- let rt = runtime();
- let connector = DebugConnector::new();
-
- let client = Client::builder(TokioExecutor::new()).build(connector);
-
- let server = TcpListener::bind("127.0.0.1:0").unwrap();
- let addr = server.local_addr().unwrap();
- let num_conns: Arc<AtomicUsize> = Default::default();
- let num_requests: Arc<AtomicUsize> = Default::default();
- let num_requests_tracker = num_requests.clone();
- let num_conns_tracker = num_conns.clone();
- thread::spawn(move || loop {
- let mut sock = server.accept().unwrap().0;
- num_conns_tracker.fetch_add(1, Ordering::Relaxed);
- let num_requests_tracker = num_requests_tracker.clone();
- thread::spawn(move || {
- sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
- sock.set_write_timeout(Some(Duration::from_secs(5)))
- .unwrap();
- let mut buf = [0; 4096];
- loop {
- if sock.read(&mut buf).expect("read 1") > 0 {
- num_requests_tracker.fetch_add(1, Ordering::Relaxed);
- sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
- .expect("write 1");
- }
- }
- });
- });
- let make_request = || {
- Request::builder()
- .uri(&*format!("http://{addr}/a"))
- .body(Empty::<Bytes>::new())
- .unwrap()
- };
- let mut req = make_request();
- let captured_conn = capture_connection(&mut req);
- rt.block_on(client.request(req)).expect("200 OK");
- assert_eq!(num_conns.load(Ordering::SeqCst), 1);
- assert_eq!(num_requests.load(Ordering::SeqCst), 1);
-
- rt.block_on(client.request(make_request())).expect("200 OK");
- rt.block_on(client.request(make_request())).expect("200 OK");
- // Before poisoning the connection is reused
- assert_eq!(num_conns.load(Ordering::SeqCst), 1);
- assert_eq!(num_requests.load(Ordering::SeqCst), 3);
- captured_conn
- .connection_metadata()
- .as_ref()
- .unwrap()
- .poison();
-
- rt.block_on(client.request(make_request())).expect("200 OK");
-
- // After poisoning, a new connection is established
- assert_eq!(num_conns.load(Ordering::SeqCst), 2);
- assert_eq!(num_requests.load(Ordering::SeqCst), 4);
-
- rt.block_on(client.request(make_request())).expect("200 OK");
- // another request can still reuse:
- assert_eq!(num_conns.load(Ordering::SeqCst), 2);
- assert_eq!(num_requests.load(Ordering::SeqCst), 5);
-}
-
-// -------------------------------------------------------
-// Below is our custom code for testing hyper legacy-client behavior with mock connections for PR #184
-// We use fully qualified paths for all types and identifiers to make this code
-// copy/paste-able without relying on external 'use' statements. Detailed inline
-// comments explain the purpose and logic of each section.
-
-//XXX: can manually run like this:
-// $ cargo test --features="http1,http2,server,client-legacy" --test legacy_client -- test_connection_error_propagation test_incomplete_message_error --nocapture
-// $ cargo test --all-features --test legacy_client -- --nocapture
-// $ cargo test --all-features --test legacy_client
-
-use std::error::Error; // needed for .source() eg. error[E0599]: no method named `source` found for struct `hyper_util::client::legacy::Error` in the current scope
-
-// Helper function to debug byte slices by attempting to interpret them as UTF-8.
-// If the bytes are valid UTF-8, they are printed as a string; otherwise, they are
-// printed as a raw byte array. This aids in debugging tokio_test::io::Mock mismatches.
-fn debug_bytes(bytes: &[u8], label: &str) {
- // Try to convert the byte slice to a UTF-8 string.
- // If successful, print it with the provided label for context.
- if let Ok(s) = std::str::from_utf8(bytes) {
- eprintln!("{}: {}", label, s);
- } else {
- // If the bytes are not valid UTF-8, print them as a raw byte array.
- eprintln!("{}: {:?}", label, bytes);
- }
-}
-
-// Struct representing a mock connection for testing hyper client behavior.
-// Implements hyper::rt::Read, hyper::rt::Write, and hyper_util::client::legacy::connect::Connection
-// traits to simulate I/O operations. Uses tokio_test::io::Mock for controlled I/O behavior.
-struct MockConnection {
- // The underlying mock I/O object, wrapped in hyper_util::rt::TokioIo for compatibility.
- inner: hyper_util::rt::TokioIo<tokio_test::io::Mock>,
- // Atomic flag to signal a connection failure, controlling poll_read behavior.
- failed: std::sync::Arc<std::sync::atomic::AtomicBool>,
- // The error to return when failed=true, simulating an I/O failure.
- error: std::sync::Arc<std::io::Error>,
- // Optional channel to signal unexpected writes, used for debugging.
- error_tx: Option<tokio::sync::mpsc::Sender<()>>,
- // Tracks total bytes written, for logging and verification.
- bytes_written: usize,
-}
-
-impl MockConnection {
- // Constructor for MockConnection, initializing all fields.
- // Takes a mock I/O object, failure flag, error, and optional error channel.
- fn new(
- mock: tokio_test::io::Mock,
- failed: std::sync::Arc<std::sync::atomic::AtomicBool>,
- error: std::sync::Arc<std::io::Error>,
- error_tx: Option<tokio::sync::mpsc::Sender<()>>,
- ) -> Self {
- MockConnection {
- inner: hyper_util::rt::TokioIo::new(mock),
- failed,
- error,
- error_tx,
- bytes_written: 0,
- }
- }
-}
-
-// Implement hyper::rt::Read trait to handle read operations on the mock connection.
-// Controls whether an error or mock I/O data is returned based on the failed flag.
-impl hyper::rt::Read for MockConnection {
- // Polls the connection for reading, filling the provided buffer.
- // If failed=true, returns the stored error; otherwise, delegates to the mock I/O.
- fn poll_read(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: hyper::rt::ReadBufCursor<'_>,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- // Log the current state of the failed flag for debugging.
- eprintln!(
- "poll_read: failed={}",
- self.failed.load(std::sync::atomic::Ordering::SeqCst)
- );
- // Check if the connection is marked as failed.
- // If true, return the stored error immediately to simulate a connection failure.
- if self.failed.load(std::sync::atomic::Ordering::SeqCst) {
- // Log the error being returned for traceability.
- eprintln!("poll_read: returning error: {}", self.error);
- // Create a new io::Error with the same kind and message as the stored error.
- return std::task::Poll::Ready(std::result::Result::Err(std::io::Error::new(
- self.error.kind(),
- self.error.to_string(),
- )));
- }
- // If not failed, delegate to the mock I/O to simulate normal read behavior.
- // This may return EOF (Poll::Ready(Ok(0))) for empty IoBuilder.
- let inner = std::pin::Pin::new(&mut self.inner);
- inner.poll_read(cx, buf)
- }
-}
-
-// Implement hyper::rt::Write trait to handle write operations on the mock connection.
-// Logs writes and signals unexpected writes via error_tx.
-impl hyper::rt::Write for MockConnection {
- // Polls the connection for writing, sending the provided buffer.
- // Logs the write operation and tracks total bytes written.
- fn poll_write(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
- // Log the size of the buffer being written for debugging.
- eprintln!("poll_write: {} bytes", buf.len());
- // Debug the buffer contents as UTF-8 or raw bytes.
- debug_bytes(buf, "poll_write buffer");
- // Delegate the write to the mock I/O object.
- let inner = std::pin::Pin::new(&mut self.inner);
- match inner.poll_write(cx, buf) {
- // If the write succeeds, update the bytes_written counter and log the result.
- std::task::Poll::Ready(std::result::Result::Ok(bytes)) => {
- // Increment the total bytes written for tracking.
- self.bytes_written += bytes;
- // Log the number of bytes written and the running total.
- eprintln!(
- "poll_write: wrote {} bytes, total={}",
- bytes, self.bytes_written
- );
- // If error_tx is present, signal an unexpected write (used in error tests).
- // This helps detect writes when the connection should fail early.
- if let Some(tx) = self.error_tx.take() {
- // Log that an unexpected write is being signaled.
- eprintln!("poll_write: signaling unexpected write");
- // Send a message through the channel, ignoring errors if the receiver is closed.
- let _ = tx.try_send(());
- }
- // Return the successful write result.
- std::task::Poll::Ready(std::result::Result::Ok(bytes))
- }
- // For pending or error results, propagate them directly.
- other => other,
- }
- }
-
- // Polls the connection to flush any buffered data.
- // Delegates to the mock I/O object.
- fn poll_flush(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- // Log the flush operation for debugging.
- eprintln!("poll_flush");
- // Delegate the flush to the mock I/O object.
- let inner = std::pin::Pin::new(&mut self.inner);
- inner.poll_flush(cx)
- }
-
- // Polls the connection to shut down the write side.
- // Delegates to the mock I/O object.
- fn poll_shutdown(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- // Log the shutdown operation for debugging.
- eprintln!("poll_shutdown");
- // Delegate the shutdown to the mock I/O object.
- let inner = std::pin::Pin::new(&mut self.inner);
- inner.poll_shutdown(cx)
- }
-}
-
-// Implement hyper_util::client::legacy::connect::Connection trait to provide connection metadata.
-// Required for hyper to use MockConnection as a valid connection.
-impl hyper_util::client::legacy::connect::Connection for MockConnection {
- // Returns metadata about the connection.
- // In this case, a default Connected object indicating a new connection.
- fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
- hyper_util::client::legacy::connect::Connected::new()
- }
-}
-
-// Struct representing a mock connector for creating MockConnection instances.
-// Implements tower_service::Service to integrate with hyper’s client.
-#[derive(Clone)]
-struct MockConnector {
- // The IoBuilder used to create mock I/O objects for each connection.
- io_builder: tokio_test::io::Builder,
- // Optional error to simulate a connection failure, passed to MockConnection.
- conn_error: Option<std::sync::Arc<std::io::Error>>,
-}
-
-impl MockConnector {
- // Constructor for MockConnector, initializing the IoBuilder and optional error.
- fn new(
- io_builder: tokio_test::io::Builder,
- conn_error: Option<std::sync::Arc<std::io::Error>>,
- ) -> Self {
- MockConnector {
- io_builder,
- conn_error,
- }
- }
-}
-
-// Implement tower_service::Service for MockConnector to create MockConnection instances.
-// Takes a hyper::Uri and returns a future resolving to a MockConnection.
-impl tower_service::Service<hyper::Uri> for MockConnector {
- type Response = crate::MockConnection;
- type Error = std::io::Error;
- type Future = std::pin::Pin<
- Box<
- dyn futures_util::Future<Output = std::result::Result<Self::Response, Self::Error>>
- + Send,
- >,
- >;
-
- // Polls the connector to check if it’s ready to handle a request.
- // Always ready, as we don’t have resource constraints.
- fn poll_ready(
- &mut self,
- _cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
- std::task::Poll::Ready(std::result::Result::Ok(()))
- }
-
- // Creates a new MockConnection for the given URI.
- // Configures the connection based on io_builder and conn_error.
- fn call(&mut self, _req: hyper::Uri) -> Self::Future {
- // Clone the IoBuilder to create a fresh mock I/O object.
- let mut io_builder = self.io_builder.clone();
- // Clone the optional connection error for this call.
- let conn_error = self.conn_error.clone();
- // Return a pinned future that creates the MockConnection.
- Box::pin(async move {
- // Build the mock I/O object from the IoBuilder.
- // This defines the I/O behavior (e.g., EOF for empty builder).
- let mock = io_builder.build();
- // Create an atomic flag to track connection failure, initially false.
- let failed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
- // Set the default error for non-failure cases.
- // Used when conn_error is None, simulating a clean EOF or connection close.
- let error = if let Some(ref err) = conn_error {
- err.clone()
- } else {
- std::sync::Arc::new(std::io::Error::new(
- std::io::ErrorKind::BrokenPipe,
- "connection closed",
- ))
- };
- // Create an mpsc channel for signaling unexpected writes, if conn_error is set.
- // This helps debug cases where writes occur despite an expected failure.
- let error_tx = if conn_error.is_some() {
- // Create a channel with a buffer of 1 for signaling writes.
- let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
- // Spawn a task to log unexpected writes when received.
- tokio::spawn(async move {
- // Wait for a message indicating a write occurred.
- if rx.recv().await.is_some() {
- // Log the unexpected write for debugging.
- eprintln!("Unexpected write occurred");
- }
- });
- Some(tx)
- } else {
- None
- };
- // If a connection error is provided, mark the connection as failed.
- // This causes poll_read to return the error immediately.
- if let Some(err_clone) = conn_error {
- // Set the failed flag to true atomically.
- failed.store(true, std::sync::atomic::Ordering::SeqCst);
- // Log the simulated error for traceability.
- eprintln!("Simulated conn task error: {}", err_clone);
- }
- // Create and return the MockConnection with all configured components.
- std::result::Result::Ok(crate::MockConnection::new(mock, failed, error, error_tx))
- })
- }
-}
-
-// Test for connection error propagation with PR #184.
-// Simulates a connection failure by setting failed=true and returning a custom io::Error.
-// Verifies the error propagates through hyper’s client as a hyper::Error(Io, ...).
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_connection_error_propagation_pr184() {
- // Define the error message for the simulated connection failure.
- // Reused for creating the error and verifying the result.
- let err_str = "mock connection failure";
- // Create an io::Error with Other kind and the custom message.
- // Wrapped in Arc for sharing across threads and MockConnection.
- let io_error = std::sync::Arc::new(std::io::Error::new(std::io::ErrorKind::Other, err_str));
- // Create an empty IoBuilder, as no I/O is expected.
- // The error triggers before any reads or writes occur.
- let io_builder = tokio_test::io::Builder::new();
- // Create a MockConnector with the error to simulate a failed connection.
- // The error will set failed=true in MockConnection.
- let connector = crate::MockConnector::new(io_builder, Some(io_error.clone()));
- // Build the hyper client with TokioExecutor and our connector.
- // pool_max_idle_per_host(0) disables connection pooling for a fresh connection.
- let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
- .pool_max_idle_per_host(0)
- .build::<_, http_body_util::Empty<hyper::body::Bytes>>(connector);
- // Build a GET request to a mock URI with custom headers.
- // Uses mixed-case headers to match your style, ensuring case-insensitive handling.
- let request = hyper::Request::builder()
- .uri("http://mocked")
- .header("hoSt", "mocked")
- .header("conNection", "close")
- .body(http_body_util::Empty::<hyper::body::Bytes>::new())
- .expect("failed to build request");
- // Send the request and capture the result.
- // Expect it to fail due to the simulated connection error.
- let result = client.request(request).await;
- // Extract the error, as the request should fail.
- let err = result.expect_err("expected request to fail");
- // Log the full error for debugging, including its structure.
- // Matches your detailed logging style for traceability.
- eprintln!("Actually gotten error is: {:?}", err);
- // Downcast the error to a hyper::Error to verify its type.
- // Expect a hyper::Error wrapping an io::Error from MockConnection.
- let hyper_err = err
- .source()
- .and_then(|e| e.downcast_ref::<hyper::Error>())
- .expect("expected hyper::Error");
- // Downcast the hyper::Error’s source to an io::Error.
- // Verify it matches the simulated error from MockConnection.
- let io_err = hyper_err
- .source()
- .and_then(|e| e.downcast_ref::<std::io::Error>())
- .expect(&format!("expected io::Error but got {:?}", hyper_err));
- // Verify the io::Error has the expected kind (Other).
- assert_eq!(io_err.kind(), std::io::ErrorKind::Other);
- // Verify the io::Error’s message matches err_str.
- assert_eq!(io_err.to_string(), err_str);
-}
-
-// Test for consistent IncompleteMessage error with or without PR #184.
-// Simulates a connection that returns EOF immediately, causing hyper’s HTTP/1.1 parser
-// to fail with IncompleteMessage due to no response data.
-// Uses MockConnector with conn_error=None to keep failed=false, ensuring EOF behavior.
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_incomplete_message_error_pr184() {
- // Create an empty IoBuilder to simulate a connection with no data.
- // No write or read expectations, so poll_read returns EOF (Poll::Ready(Ok(0))).
- // This triggers IncompleteMessage in hyper’s parser.
- let io_builder = tokio_test::io::Builder::new();
- // Create MockConnector with no error (conn_error=None).
- // Keeps failed=false in MockConnection, so poll_read delegates to the mock’s EOF.
- let connector = crate::MockConnector::new(io_builder, None);
- // Build the hyper client with TokioExecutor and our connector.
- // pool_max_idle_per_host(0) disables pooling for a fresh connection.
- let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
- .pool_max_idle_per_host(0)
- .build::<_, http_body_util::Empty<hyper::body::Bytes>>(connector);
- // Build a GET request to a mock URI with headers.
- // Uses mixed-case headers to match test_connection_error_propagation_pr184.
- // Empty body ensures focus on response parsing failure.
- let request = hyper::Request::builder()
- .uri("http://mocked")
- .header("hoSt", "mocked")
- .header("conNection", "close")
- .body(http_body_util::Empty::<hyper::body::Bytes>::new())
- .expect("failed to build request");
- // Send the request and capture the result.
- // Expect failure due to EOF causing IncompleteMessage.
- let result = client.request(request).await;
- // Extract the error, as the request should fail.
- // Without PR #184, expect ChannelClosed; with PR #184, expect IncompleteMessage.
- let err = result.expect_err("expected request to fail");
- // Log the full error for debugging, matching your style.
- eprintln!("Actually gotten error is: {:?}", err);
- // Downcast to hyper::Error to verify the error type.
- // Expect IncompleteMessage (with PR #184) or ChannelClosed (without).
- let hyper_err = err
- .source()
- .and_then(|e| e.downcast_ref::<hyper::Error>())
- .expect("expected hyper::Error");
- // Verify the error is IncompleteMessage when PR #184 is applied.
- // This checks the parser’s failure due to EOF.
- assert!(
- hyper_err.is_incomplete_message(),
- "expected IncompleteMessage, got {:?}",
- hyper_err
- );
- // Confirm no io::Error is present, as this is a parsing failure, not I/O.
- // Ensures we’re testing the correct error type.
- assert!(
- hyper_err
- .source()
- .and_then(|e| e.downcast_ref::<std::io::Error>())
- .is_none(),
- "expected no io::Error, got {:?}",
- hyper_err
- );
-}
-
-// Test for a successful HTTP/1.1 connection using a mock connector.
-// Simulates a server that accepts a request and responds with a 200 OK.
-// Verifies the client correctly sends the request and receives the response.
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_successful_connection() {
- // Define the expected server response: a valid HTTP/1.1 200 OK with no body.
- let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
- // Define the expected client request, including headers and CRLF termination.
- // This ensures the client sends the correct request format.
- let expected_request = b"GET / HTTP/1.1\r\nhost: mocked\r\nconnection: close\r\n\r\n";
- // Create an IoBuilder to simulate the server’s I/O behavior.
- // Expect the client to write the request and read the response.
- let mut io_builder = tokio_test::io::Builder::new();
- // Configure the IoBuilder to expect the request and provide the response.
- io_builder.write(expected_request).read(response);
- // Finalize the IoBuilder for use in the connector.
- let io_builder = io_builder;
- // Create a MockConnector with no error (conn_error=None).
- // Ensures failed=false, allowing normal I/O operations.
- let connector = crate::MockConnector::new(io_builder, None);
- // Build the hyper client with TokioExecutor and our connector.
- // pool_max_idle_per_host(0) ensures a fresh connection.
- let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
- .pool_max_idle_per_host(0)
- .build::<_, http_body_util::Empty<hyper::body::Bytes>>(connector);
- // Build a GET request to a mock URI with headers.
- // Uses mixed-case headers to match your style and verify case-insensitive handling.
- let request = hyper::Request::builder()
- .uri("http://mocked")
- .header("hOst", "mocked")
- .header("coNnection", "close")
- .body(http_body_util::Empty::<hyper::body::Bytes>::new())
- .expect("failed to build request");
- // Send the request and capture the response.
- // Expect a successful response due to the configured IoBuilder.
- let response = client
- .request(request)
- .await
- .expect("request should succeed");
- // Verify the response status is 200 OK.
- assert_eq!(response.status(), 200);
-}
diff --git a/vendor/hyper-util/tests/proxy.rs b/vendor/hyper-util/tests/proxy.rs
deleted file mode 100644
index 95f4bc2f..00000000
--- a/vendor/hyper-util/tests/proxy.rs
+++ /dev/null
@@ -1,478 +0,0 @@
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::net::{TcpListener, TcpStream};
-use tower_service::Service;
-
-use hyper_util::client::legacy::connect::proxy::{SocksV4, SocksV5, Tunnel};
-use hyper_util::client::legacy::connect::HttpConnector;
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_tunnel_works() {
- let tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let addr = tcp.local_addr().expect("local_addr");
-
- let proxy_dst = format!("http://{addr}").parse().expect("uri");
- let mut connector = Tunnel::new(proxy_dst, HttpConnector::new());
- let t1 = tokio::spawn(async move {
- let _conn = connector
- .call("https://hyper.rs".parse().unwrap())
- .await
- .expect("tunnel");
- });
-
- let t2 = tokio::spawn(async move {
- let (mut io, _) = tcp.accept().await.expect("accept");
- let mut buf = [0u8; 64];
- let n = io.read(&mut buf).await.expect("read 1");
- assert_eq!(
- &buf[..n],
- b"CONNECT hyper.rs:443 HTTP/1.1\r\nHost: hyper.rs:443\r\n\r\n"
- );
- io.write_all(b"HTTP/1.1 200 OK\r\n\r\n")
- .await
- .expect("write 1");
- });
-
- t1.await.expect("task 1");
- t2.await.expect("task 2");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_socks_v5_without_auth_works() {
- let proxy_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let proxy_addr = proxy_tcp.local_addr().expect("local_addr");
- let proxy_dst = format!("http://{proxy_addr}").parse().expect("uri");
-
- let target_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let target_addr = target_tcp.local_addr().expect("local_addr");
- let target_dst = format!("http://{target_addr}").parse().expect("uri");
-
- let mut connector = SocksV5::new(proxy_dst, HttpConnector::new());
-
- // Client
- //
- // Will use `SocksV5` to establish proxy tunnel.
- // Will send "Hello World!" to the target and receive "Goodbye!" back.
- let t1 = tokio::spawn(async move {
- let conn = connector.call(target_dst).await.expect("tunnel");
- let mut tcp = conn.into_inner();
-
- tcp.write_all(b"Hello World!").await.expect("write 1");
-
- let mut buf = [0u8; 64];
- let n = tcp.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], b"Goodbye!");
- });
-
- // Proxy
- //
- // Will receive CONNECT command from client.
- // Will connect to target and success code back to client.
- // Will blindly tunnel between client and target.
- let t2 = tokio::spawn(async move {
- let (mut to_client, _) = proxy_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 513];
-
- // negotiation req/res
- let n = to_client.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], [0x05, 0x01, 0x00]);
-
- to_client.write_all(&[0x05, 0x00]).await.expect("write 1");
-
- // command req/rs
- let [p1, p2] = target_addr.port().to_be_bytes();
- let [ip1, ip2, ip3, ip4] = [0x7f, 0x00, 0x00, 0x01];
- let message = [0x05, 0x01, 0x00, 0x01, ip1, ip2, ip3, ip4, p1, p2];
- let n = to_client.read(&mut buf).await.expect("read 2");
- assert_eq!(&buf[..n], message);
-
- let mut to_target = TcpStream::connect(target_addr).await.expect("connect");
-
- let message = [0x05, 0x00, 0x00, 0x01, ip1, ip2, ip3, ip4, p1, p2];
- to_client.write_all(&message).await.expect("write 2");
-
- let (from_client, from_target) =
- tokio::io::copy_bidirectional(&mut to_client, &mut to_target)
- .await
- .expect("proxy");
-
- assert_eq!(from_client, 12);
- assert_eq!(from_target, 8)
- });
-
- // Target server
- //
- // Will accept connection from proxy server
- // Will receive "Hello World!" from the client and return "Goodbye!"
- let t3 = tokio::spawn(async move {
- let (mut io, _) = target_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 64];
-
- let n = io.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], b"Hello World!");
-
- io.write_all(b"Goodbye!").await.expect("write 1");
- });
-
- t1.await.expect("task - client");
- t2.await.expect("task - proxy");
- t3.await.expect("task - target");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_socks_v5_with_auth_works() {
- let proxy_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let proxy_addr = proxy_tcp.local_addr().expect("local_addr");
- let proxy_dst = format!("http://{proxy_addr}").parse().expect("uri");
-
- let target_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let target_addr = target_tcp.local_addr().expect("local_addr");
- let target_dst = format!("http://{target_addr}").parse().expect("uri");
-
- let mut connector =
- SocksV5::new(proxy_dst, HttpConnector::new()).with_auth("user".into(), "pass".into());
-
- // Client
- //
- // Will use `SocksV5` to establish proxy tunnel.
- // Will send "Hello World!" to the target and receive "Goodbye!" back.
- let t1 = tokio::spawn(async move {
- let conn = connector.call(target_dst).await.expect("tunnel");
- let mut tcp = conn.into_inner();
-
- tcp.write_all(b"Hello World!").await.expect("write 1");
-
- let mut buf = [0u8; 64];
- let n = tcp.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], b"Goodbye!");
- });
-
- // Proxy
- //
- // Will receive CONNECT command from client.
- // Will connect to target and success code back to client.
- // Will blindly tunnel between client and target.
- let t2 = tokio::spawn(async move {
- let (mut to_client, _) = proxy_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 513];
-
- // negotiation req/res
- let n = to_client.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], [0x05, 0x01, 0x02]);
-
- to_client.write_all(&[0x05, 0x02]).await.expect("write 1");
-
- // auth req/res
- let n = to_client.read(&mut buf).await.expect("read 2");
- let [u1, u2, u3, u4] = b"user";
- let [p1, p2, p3, p4] = b"pass";
- let message = [0x01, 0x04, *u1, *u2, *u3, *u4, 0x04, *p1, *p2, *p3, *p4];
- assert_eq!(&buf[..n], message);
-
- to_client.write_all(&[0x01, 0x00]).await.expect("write 2");
-
- // command req/res
- let n = to_client.read(&mut buf).await.expect("read 3");
- let [p1, p2] = target_addr.port().to_be_bytes();
- let [ip1, ip2, ip3, ip4] = [0x7f, 0x00, 0x00, 0x01];
- let message = [0x05, 0x01, 0x00, 0x01, ip1, ip2, ip3, ip4, p1, p2];
- assert_eq!(&buf[..n], message);
-
- let mut to_target = TcpStream::connect(target_addr).await.expect("connect");
-
- let message = [0x05, 0x00, 0x00, 0x01, ip1, ip2, ip3, ip4, p1, p2];
- to_client.write_all(&message).await.expect("write 3");
-
- let (from_client, from_target) =
- tokio::io::copy_bidirectional(&mut to_client, &mut to_target)
- .await
- .expect("proxy");
-
- assert_eq!(from_client, 12);
- assert_eq!(from_target, 8)
- });
-
- // Target server
- //
- // Will accept connection from proxy server
- // Will receive "Hello World!" from the client and return "Goodbye!"
- let t3 = tokio::spawn(async move {
- let (mut io, _) = target_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 64];
-
- let n = io.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], b"Hello World!");
-
- io.write_all(b"Goodbye!").await.expect("write 1");
- });
-
- t1.await.expect("task - client");
- t2.await.expect("task - proxy");
- t3.await.expect("task - target");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_socks_v5_with_server_resolved_domain_works() {
- let proxy_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let proxy_addr = proxy_tcp.local_addr().expect("local_addr");
- let proxy_addr = format!("http://{proxy_addr}").parse().expect("uri");
-
- let mut connector = SocksV5::new(proxy_addr, HttpConnector::new())
- .with_auth("user".into(), "pass".into())
- .local_dns(false);
-
- // Client
- //
- // Will use `SocksV5` to establish proxy tunnel.
- // Will send "Hello World!" to the target and receive "Goodbye!" back.
- let t1 = tokio::spawn(async move {
- let _conn = connector
- .call("https://hyper.rs:443".try_into().unwrap())
- .await
- .expect("tunnel");
- });
-
- // Proxy
- //
- // Will receive CONNECT command from client.
- // Will connect to target and success code back to client.
- // Will blindly tunnel between client and target.
- let t2 = tokio::spawn(async move {
- let (mut to_client, _) = proxy_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 513];
-
- // negotiation req/res
- let n = to_client.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], [0x05, 0x01, 0x02]);
-
- to_client.write_all(&[0x05, 0x02]).await.expect("write 1");
-
- // auth req/res
- let n = to_client.read(&mut buf).await.expect("read 2");
- let [u1, u2, u3, u4] = b"user";
- let [p1, p2, p3, p4] = b"pass";
- let message = [0x01, 0x04, *u1, *u2, *u3, *u4, 0x04, *p1, *p2, *p3, *p4];
- assert_eq!(&buf[..n], message);
-
- to_client.write_all(&[0x01, 0x00]).await.expect("write 2");
-
- // command req/res
- let n = to_client.read(&mut buf).await.expect("read 3");
-
- let host = "hyper.rs";
- let port: u16 = 443;
- let mut message = vec![0x05, 0x01, 0x00, 0x03, host.len() as u8];
- message.extend(host.bytes());
- message.extend(port.to_be_bytes());
- assert_eq!(&buf[..n], message);
-
- let mut message = vec![0x05, 0x00, 0x00, 0x03, host.len() as u8];
- message.extend(host.bytes());
- message.extend(port.to_be_bytes());
- to_client.write_all(&message).await.expect("write 3");
- });
-
- t1.await.expect("task - client");
- t2.await.expect("task - proxy");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_socks_v5_with_locally_resolved_domain_works() {
- let proxy_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let proxy_addr = proxy_tcp.local_addr().expect("local_addr");
- let proxy_addr = format!("http://{proxy_addr}").parse().expect("uri");
-
- let mut connector = SocksV5::new(proxy_addr, HttpConnector::new())
- .with_auth("user".into(), "pass".into())
- .local_dns(true);
-
- // Client
- //
- // Will use `SocksV5` to establish proxy tunnel.
- // Will send "Hello World!" to the target and receive "Goodbye!" back.
- let t1 = tokio::spawn(async move {
- let _conn = connector
- .call("https://hyper.rs:443".try_into().unwrap())
- .await
- .expect("tunnel");
- });
-
- // Proxy
- //
- // Will receive CONNECT command from client.
- // Will connect to target and success code back to client.
- // Will blindly tunnel between client and target.
- let t2 = tokio::spawn(async move {
- let (mut to_client, _) = proxy_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 513];
-
- // negotiation req/res
- let n = to_client.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], [0x05, 0x01, 0x02]);
-
- to_client.write_all(&[0x05, 0x02]).await.expect("write 1");
-
- // auth req/res
- let n = to_client.read(&mut buf).await.expect("read 2");
- let [u1, u2, u3, u4] = b"user";
- let [p1, p2, p3, p4] = b"pass";
- let message = [0x01, 0x04, *u1, *u2, *u3, *u4, 0x04, *p1, *p2, *p3, *p4];
- assert_eq!(&buf[..n], message);
-
- to_client.write_all(&[0x01, 0x00]).await.expect("write 2");
-
- // command req/res
- let n = to_client.read(&mut buf).await.expect("read 3");
- let message = [0x05, 0x01, 0x00];
- assert_eq!(&buf[..3], message);
- assert!(buf[3] == 0x01 || buf[3] == 0x04); // IPv4 or IPv6
- assert_eq!(n, 4 + 4 * (buf[3] as usize) + 2);
-
- let message = vec![0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0];
- to_client.write_all(&message).await.expect("write 3");
- });
-
- t1.await.expect("task - client");
- t2.await.expect("task - proxy");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_socks_v4_works() {
- let proxy_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let proxy_addr = proxy_tcp.local_addr().expect("local_addr");
- let proxy_dst = format!("http://{proxy_addr}").parse().expect("uri");
-
- let target_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let target_addr = target_tcp.local_addr().expect("local_addr");
- let target_dst = format!("http://{target_addr}").parse().expect("uri");
-
- let mut connector = SocksV4::new(proxy_dst, HttpConnector::new());
-
- // Client
- //
- // Will use `SocksV4` to establish proxy tunnel.
- // Will send "Hello World!" to the target and receive "Goodbye!" back.
- let t1 = tokio::spawn(async move {
- let conn = connector.call(target_dst).await.expect("tunnel");
- let mut tcp = conn.into_inner();
-
- tcp.write_all(b"Hello World!").await.expect("write 1");
-
- let mut buf = [0u8; 64];
- let n = tcp.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], b"Goodbye!");
- });
-
- // Proxy
- //
- // Will receive CONNECT command from client.
- // Will connect to target and success code back to client.
- // Will blindly tunnel between client and target.
- let t2 = tokio::spawn(async move {
- let (mut to_client, _) = proxy_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 512];
-
- let [p1, p2] = target_addr.port().to_be_bytes();
- let [ip1, ip2, ip3, ip4] = [127, 0, 0, 1];
- let message = [4, 0x01, p1, p2, ip1, ip2, ip3, ip4, 0, 0];
- let n = to_client.read(&mut buf).await.expect("read");
- assert_eq!(&buf[..n], message);
-
- let mut to_target = TcpStream::connect(target_addr).await.expect("connect");
-
- let message = [0, 90, p1, p2, ip1, ip2, ip3, ip4];
- to_client.write_all(&message).await.expect("write");
-
- let (from_client, from_target) =
- tokio::io::copy_bidirectional(&mut to_client, &mut to_target)
- .await
- .expect("proxy");
-
- assert_eq!(from_client, 12);
- assert_eq!(from_target, 8)
- });
-
- // Target server
- //
- // Will accept connection from proxy server
- // Will receive "Hello World!" from the client and return "Goodbye!"
- let t3 = tokio::spawn(async move {
- let (mut io, _) = target_tcp.accept().await.expect("accept");
- let mut buf = [0u8; 64];
-
- let n = io.read(&mut buf).await.expect("read 1");
- assert_eq!(&buf[..n], b"Hello World!");
-
- io.write_all(b"Goodbye!").await.expect("write 1");
- });
-
- t1.await.expect("task - client");
- t2.await.expect("task - proxy");
- t3.await.expect("task - target");
-}
-
-#[cfg(not(miri))]
-#[tokio::test]
-async fn test_socks_v5_optimistic_works() {
- let proxy_tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
- let proxy_addr = proxy_tcp.local_addr().expect("local_addr");
- let proxy_dst = format!("http://{proxy_addr}").parse().expect("uri");
-
- let target_addr = std::net::SocketAddr::new([127, 0, 0, 1].into(), 1234);
- let target_dst = format!("http://{target_addr}").parse().expect("uri");
-
- let mut connector = SocksV5::new(proxy_dst, HttpConnector::new())
- .with_auth("ABC".into(), "XYZ".into())
- .send_optimistically(true);
-
- // Client
- //
- // Will use `SocksV5` to establish proxy tunnel.
- // Will send "Hello World!" to the target and receive "Goodbye!" back.
- let t1 = tokio::spawn(async move {
- let _ = connector.call(target_dst).await.expect("tunnel");
- });
-
- // Proxy
- //
- // Will receive SOCKS handshake from client.
- // Will connect to target and success code back to client.
- // Will blindly tunnel between client and target.
- let t2 = tokio::spawn(async move {
- let (mut to_client, _) = proxy_tcp.accept().await.expect("accept");
- let [p1, p2] = target_addr.port().to_be_bytes();
-
- let mut buf = [0; 22];
- let request = vec![
- 5, 1, 2, // Negotiation
- 1, 3, 65, 66, 67, 3, 88, 89, 90, // Auth ("ABC"/"XYZ")
- 5, 1, 0, 1, 127, 0, 0, 1, p1, p2, // Reply
- ];
-
- let response = vec![
- 5, 2, // Negotiation,
- 1, 0, // Auth,
- 5, 0, 0, 1, 127, 0, 0, 1, p1, p2, // Reply
- ];
-
- // Accept all handshake messages
- to_client.read_exact(&mut buf).await.expect("read");
- assert_eq!(request.as_slice(), buf);
-
- // Send all handshake messages back
- to_client
- .write_all(response.as_slice())
- .await
- .expect("write");
-
- to_client.flush().await.expect("flush");
- });
-
- t1.await.expect("task - client");
- t2.await.expect("task - proxy");
-}
diff --git a/vendor/hyper-util/tests/test_utils/mod.rs b/vendor/hyper-util/tests/test_utils/mod.rs
deleted file mode 100644
index df3a65d4..00000000
--- a/vendor/hyper-util/tests/test_utils/mod.rs
+++ /dev/null
@@ -1,175 +0,0 @@
-use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-
-use futures_channel::mpsc;
-use futures_util::task::{Context, Poll};
-use futures_util::Future;
-use futures_util::TryFutureExt;
-use hyper::Uri;
-use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};
-use tokio::net::TcpStream;
-
-use hyper::rt::ReadBufCursor;
-
-use hyper_util::client::legacy::connect::HttpConnector;
-use hyper_util::client::legacy::connect::{Connected, Connection};
-use hyper_util::rt::TokioIo;
-
-#[derive(Clone)]
-pub struct DebugConnector {
- pub http: HttpConnector,
- pub closes: mpsc::Sender<()>,
- pub connects: Arc<AtomicUsize>,
- pub is_proxy: bool,
- pub alpn_h2: bool,
-}
-
-impl DebugConnector {
- pub fn new() -> DebugConnector {
- let http = HttpConnector::new();
- let (tx, _) = mpsc::channel(10);
- DebugConnector::with_http_and_closes(http, tx)
- }
-
- pub fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector {
- DebugConnector {
- http,
- closes,
- connects: Arc::new(AtomicUsize::new(0)),
- is_proxy: false,
- alpn_h2: false,
- }
- }
-
- pub fn proxy(mut self) -> Self {
- self.is_proxy = true;
- self
- }
-}
-
-impl tower_service::Service<Uri> for DebugConnector {
- type Response = DebugStream;
- type Error = <HttpConnector as tower_service::Service<Uri>>::Error;
- type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
-
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- // don't forget to check inner service is ready :)
- tower_service::Service::<Uri>::poll_ready(&mut self.http, cx)
- }
-
- fn call(&mut self, dst: Uri) -> Self::Future {
- self.connects.fetch_add(1, Ordering::SeqCst);
- let closes = self.closes.clone();
- let is_proxy = self.is_proxy;
- let is_alpn_h2 = self.alpn_h2;
- Box::pin(self.http.call(dst).map_ok(move |tcp| DebugStream {
- tcp,
- on_drop: closes,
- is_alpn_h2,
- is_proxy,
- }))
- }
-}
-
-pub struct DebugStream {
- tcp: TokioIo<TcpStream>,
- on_drop: mpsc::Sender<()>,
- is_alpn_h2: bool,
- is_proxy: bool,
-}
-
-impl Drop for DebugStream {
- fn drop(&mut self) {
- let _ = self.on_drop.try_send(());
- }
-}
-
-impl Connection for DebugStream {
- fn connected(&self) -> Connected {
- let connected = self.tcp.connected().proxy(self.is_proxy);
-
- if self.is_alpn_h2 {
- connected.negotiated_h2()
- } else {
- connected
- }
- }
-}
-
-impl hyper::rt::Read for DebugStream {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: ReadBufCursor<'_>,
- ) -> Poll<Result<(), std::io::Error>> {
- hyper::rt::Read::poll_read(Pin::new(&mut self.tcp), cx, buf)
- }
-}
-
-impl hyper::rt::Write for DebugStream {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, std::io::Error>> {
- hyper::rt::Write::poll_write(Pin::new(&mut self.tcp), cx, buf)
- }
-
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), std::io::Error>> {
- hyper::rt::Write::poll_flush(Pin::new(&mut self.tcp), cx)
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), std::io::Error>> {
- hyper::rt::Write::poll_shutdown(Pin::new(&mut self.tcp), cx)
- }
-
- fn is_write_vectored(&self) -> bool {
- hyper::rt::Write::is_write_vectored(&self.tcp)
- }
-
- fn poll_write_vectored(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- bufs: &[std::io::IoSlice<'_>],
- ) -> Poll<Result<usize, std::io::Error>> {
- hyper::rt::Write::poll_write_vectored(Pin::new(&mut self.tcp), cx, bufs)
- }
-}
-
-impl AsyncWrite for DebugStream {
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), io::Error>> {
- Pin::new(self.tcp.inner_mut()).poll_shutdown(cx)
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
- Pin::new(self.tcp.inner_mut()).poll_flush(cx)
- }
-
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize, io::Error>> {
- Pin::new(self.tcp.inner_mut()).poll_write(cx, buf)
- }
-}
-
-impl AsyncRead for DebugStream {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- Pin::new(self.tcp.inner_mut()).poll_read(cx, buf)
- }
-}