summaryrefslogtreecommitdiff
path: root/vendor/reqwest/tests/support/delay_server.rs
blob: f79c2a4df45ac2a6a0ba95d2a039a536d9e7e8e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#![cfg(not(target_arch = "wasm32"))]
#![allow(unused)]
use std::convert::Infallible;
use std::future::Future;
use std::net;
use std::time::Duration;

use futures_util::FutureExt;
use http::{Request, Response};
use hyper::service::service_fn;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::oneshot;

/// This server, unlike [`super::server::Server`], allows for delaying the
/// specified amount of time after each TCP connection is established. This is
/// useful for testing the behavior of the client when the server is slow.
///
/// For example, in case of HTTP/2, once the TCP/TLS connection is established,
/// both endpoints are supposed to send a preface and an initial `SETTINGS`
/// frame (See [RFC9113 3.4] for details). What if these frames are delayed for
/// whatever reason? This server allows for testing such scenarios.
///
/// [RFC9113 3.4]: https://www.rfc-editor.org/rfc/rfc9113.html#name-http-2-connection-preface
pub struct Server {
    addr: net::SocketAddr,
    shutdown_tx: Option<oneshot::Sender<()>>,
    server_terminated_rx: oneshot::Receiver<()>,
}

type Builder = hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>;

impl Server {
    pub async fn new<F1, Fut, F2, Bu>(func: F1, apply_config: F2, delay: Duration) -> Self
    where
        F1: Fn(Request<hyper::body::Incoming>) -> Fut + Clone + Send + 'static,
        Fut: Future<Output = Response<reqwest::Body>> + Send + 'static,
        F2: FnOnce(&mut Builder) -> Bu + Send + 'static,
    {
        let (shutdown_tx, shutdown_rx) = oneshot::channel();
        let (server_terminated_tx, server_terminated_rx) = oneshot::channel();

        let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = tcp_listener.local_addr().unwrap();

        tokio::spawn(async move {
            let mut builder =
                hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());
            apply_config(&mut builder);

            tokio::spawn(async move {
                let builder = builder;
                let (connection_shutdown_tx, connection_shutdown_rx) = oneshot::channel();
                let connection_shutdown_rx = connection_shutdown_rx.shared();
                let mut shutdown_rx = std::pin::pin!(shutdown_rx);

                let mut handles = Vec::new();
                loop {
                    select! {
                        _ = shutdown_rx.as_mut() => {
                            connection_shutdown_tx.send(()).unwrap();
                            break;
                        }
                        res = tcp_listener.accept() => {
                            let (stream, _) = res.unwrap();
                            let io = hyper_util::rt::TokioIo::new(stream);


                            let handle = tokio::spawn({
                                let connection_shutdown_rx = connection_shutdown_rx.clone();
                                let func = func.clone();
                                let svc = service_fn(move |req| {
                                    let fut = func(req);
                                    async move {
                                    Ok::<_, Infallible>(fut.await)
                                }});
                                let builder = builder.clone();

                                async move {
                                    let fut = builder.serve_connection_with_upgrades(io, svc);
                                    tokio::time::sleep(delay).await;

                                    let mut conn = std::pin::pin!(fut);

                                    select! {
                                        _ = conn.as_mut() => {}
                                        _ = connection_shutdown_rx => {
                                            conn.as_mut().graceful_shutdown();
                                            conn.await.unwrap();
                                        }
                                    }
                                }
                            });

                            handles.push(handle);
                        }
                    }
                }

                futures_util::future::join_all(handles).await;
                server_terminated_tx.send(()).unwrap();
            });
        });

        Self {
            addr,
            shutdown_tx: Some(shutdown_tx),
            server_terminated_rx,
        }
    }

    pub async fn shutdown(mut self) {
        if let Some(tx) = self.shutdown_tx.take() {
            let _ = tx.send(());
        }

        self.server_terminated_rx.await.unwrap();
    }

    pub fn addr(&self) -> net::SocketAddr {
        self.addr
    }
}