summaryrefslogtreecommitdiff
path: root/vendor/hyper-timeout/tests/client_upload.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/hyper-timeout/tests/client_upload.rs')
-rw-r--r--vendor/hyper-timeout/tests/client_upload.rs109
1 files changed, 109 insertions, 0 deletions
diff --git a/vendor/hyper-timeout/tests/client_upload.rs b/vendor/hyper-timeout/tests/client_upload.rs
new file mode 100644
index 00000000..531b9afa
--- /dev/null
+++ b/vendor/hyper-timeout/tests/client_upload.rs
@@ -0,0 +1,109 @@
+use http_body_util::{combinators::BoxBody, BodyExt, Full};
+use hyper::body::Bytes;
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{Request, Response};
+use hyper_util::{client::legacy::Client, rt::TokioIo};
+use std::{net::SocketAddr, time::Duration};
+use tokio::io;
+use tokio::net::TcpListener;
+use tokio::sync::oneshot;
+use tokio::task;
+
+use hyper_timeout::TimeoutConnector;
+
+async fn spawn_test_server(listener: TcpListener, shutdown_rx: oneshot::Receiver<()>) {
+ let http = http1::Builder::new();
+ let graceful = hyper_util::server::graceful::GracefulShutdown::new();
+ let mut signal = std::pin::pin!(shutdown_rx);
+
+ loop {
+ tokio::select! {
+ Ok((stream, _addr)) = listener.accept() => {
+ let io = TokioIo::new(stream);
+ let conn = http.serve_connection(io, service_fn(handle_request));
+ // watch this connection
+ let fut = graceful.watch(conn);
+ tokio::spawn(async move {
+ if let Err(e) = fut.await {
+ eprintln!("Error serving connection: {:?}", e);
+ }
+ });
+ },
+
+ _ = &mut signal => {
+ eprintln!("graceful shutdown signal received");
+ break;
+ }
+ }
+ }
+
+ tokio::select! {
+ _ = graceful.shutdown() => {
+ eprintln!("all connections gracefully closed");
+ },
+ _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
+ eprintln!("timed out wait for all connections to close");
+ }
+ }
+}
+
+async fn handle_request(
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
+ let body = req.collect().await.expect("Failed to read body").to_bytes();
+ assert!(!body.is_empty(), "empty body");
+
+ Ok(Response::new(full("finished")))
+}
+
+fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
+ Full::new(chunk.into())
+ .map_err(|never| match never {})
+ .boxed()
+}
+
+#[tokio::test]
+async fn test_upload_timeout() {
+ let addr = SocketAddr::from(([127, 0, 0, 1], 0));
+ let listener = TcpListener::bind(addr)
+ .await
+ .expect("Failed to bind listener");
+ let (shutdown_tx, shutdown_rx) = oneshot::channel();
+
+ let server_addr = listener.local_addr().unwrap();
+
+ let server_handle = task::spawn(spawn_test_server(listener, shutdown_rx));
+
+ let h = hyper_util::client::legacy::connect::HttpConnector::new();
+ let mut connector = TimeoutConnector::new(h);
+ connector.set_read_timeout(Some(Duration::from_millis(5)));
+
+ // comment this out and the test will fail
+ connector.set_reset_reader_on_write(true);
+
+ let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector);
+
+ let body = vec![0; 10 * 1024 * 1024]; // 10MB
+ let req = Request::post(format!("http://{}/", server_addr))
+ .body(full(body))
+ .expect("request builder");
+
+ let mut res = client.request(req).await.expect("request failed");
+
+ let mut resp_body = Vec::new();
+ while let Some(frame) = res.body_mut().frame().await {
+ let bytes = frame
+ .expect("frame error")
+ .into_data()
+ .map_err(|_| io::Error::new(io::ErrorKind::Other, "Error when consuming frame"))
+ .expect("data error");
+ resp_body.extend_from_slice(&bytes);
+ }
+
+ assert_eq!(res.status(), 200);
+ assert_eq!(resp_body, b"finished");
+
+ let _ = shutdown_tx.send(());
+ let _ = server_handle.await;
+}