1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::{client::legacy::Client, rt::TokioIo};
use std::{net::SocketAddr, time::Duration};
use tokio::io;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::task;
use hyper_timeout::TimeoutConnector;
async fn spawn_test_server(listener: TcpListener, shutdown_rx: oneshot::Receiver<()>) {
let http = http1::Builder::new();
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut signal = std::pin::pin!(shutdown_rx);
loop {
tokio::select! {
Ok((stream, _addr)) = listener.accept() => {
let io = TokioIo::new(stream);
let conn = http.serve_connection(io, service_fn(handle_request));
// watch this connection
let fut = graceful.watch(conn);
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error serving connection: {:?}", e);
}
});
},
_ = &mut signal => {
eprintln!("graceful shutdown signal received");
break;
}
}
}
tokio::select! {
_ = graceful.shutdown() => {
eprintln!("all connections gracefully closed");
},
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
eprintln!("timed out wait for all connections to close");
}
}
}
async fn handle_request(
req: Request<hyper::body::Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let body = req.collect().await.expect("Failed to read body").to_bytes();
assert!(!body.is_empty(), "empty body");
Ok(Response::new(full("finished")))
}
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}
#[tokio::test]
async fn test_upload_timeout() {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr)
.await
.expect("Failed to bind listener");
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let server_addr = listener.local_addr().unwrap();
let server_handle = task::spawn(spawn_test_server(listener, shutdown_rx));
let h = hyper_util::client::legacy::connect::HttpConnector::new();
let mut connector = TimeoutConnector::new(h);
connector.set_read_timeout(Some(Duration::from_millis(5)));
// comment this out and the test will fail
connector.set_reset_reader_on_write(true);
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector);
let body = vec![0; 10 * 1024 * 1024]; // 10MB
let req = Request::post(format!("http://{}/", server_addr))
.body(full(body))
.expect("request builder");
let mut res = client.request(req).await.expect("request failed");
let mut resp_body = Vec::new();
while let Some(frame) = res.body_mut().frame().await {
let bytes = frame
.expect("frame error")
.into_data()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Error when consuming frame"))
.expect("data error");
resp_body.extend_from_slice(&bytes);
}
assert_eq!(res.status(), 200);
assert_eq!(resp_body, b"finished");
let _ = shutdown_tx.send(());
let _ = server_handle.await;
}
|