Skip to content

Commit c7dc83e

Browse files
committed
Refactor test architecture to run emulated socket servers on a separate OS thread with a tokio runtime
Signed-off-by: kanpov <karpovanton729@gmail.com>
1 parent 8cec83c commit c7dc83e

File tree

4 files changed

+127
-98
lines changed

4 files changed

+127
-98
lines changed

Cargo.lock

Lines changed: 2 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,18 @@ futures-util = { version = "0.3.31", default-features = false, features = [
4141
vsock = { version = "0.5.1", optional = true }
4242

4343
[dev-dependencies]
44+
# general utils
4445
http-body-util = "0.1.2"
4546
hyper-util = { version = "0.1.6", features = ["http1"] }
4647
hyper-client-sockets = { path = ".", features = ["full"] }
47-
tokio = { version = "1.38.0", features = ["macros", "fs"] }
4848
hyper = { version = "1.4.1", features = ["server"] }
4949
uuid = { version = "1.10.0", features = ["v4"] }
5050
bytes = "1.7.2"
5151
rand = "0.8.5"
52-
smol-macros = "0.1.1"
53-
macro_rules_attribute = "0.2.0"
52+
# concrete runtimes
53+
tokio = { version = "1.38.0", features = ["macros", "fs"] }
54+
async-executor = "1.13.1"
55+
smol-hyper = { version = "0.1.1", features = ["async-executor"] }
5456

5557
[features]
5658
default = []

tests/async_io.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,41 @@
1+
use std::{future::Future, sync::Arc};
12

3+
use async_executor::Executor;
4+
use async_io::block_on;
5+
use bytes::Bytes;
6+
use common::{assert_response_ok, start_unix_server};
7+
use http::{Request, Uri};
8+
use http_body_util::Full;
9+
use hyper_client_sockets::{
10+
unix::{connector::HyperUnixConnector, UnixUriExt},
11+
Backend,
12+
};
13+
use hyper_util::client::legacy::Client;
14+
use smol_hyper::rt::SmolExecutor;
15+
16+
mod common;
17+
18+
#[test]
19+
fn unix_connectivity_with_hyper_util() {
20+
run_test(|executor| async {
21+
let path = start_unix_server();
22+
let client: Client<_, Full<Bytes>> =
23+
Client::builder(SmolExecutor::new(executor)).build(HyperUnixConnector::new(Backend::AsyncIo));
24+
let request = Request::builder()
25+
.uri(Uri::unix(path, "/").unwrap())
26+
.method("GET")
27+
.body(Full::new(Bytes::new()))
28+
.unwrap();
29+
let mut response = client.request(request).await.unwrap();
30+
assert_response_ok(&mut response).await;
31+
});
32+
}
33+
34+
fn run_test<F, Fut>(function: F)
35+
where
36+
F: FnOnce(Arc<Executor<'static>>) -> Fut,
37+
Fut: Future<Output = ()>,
38+
{
39+
let executor = Arc::new(Executor::new());
40+
block_on(executor.clone().run(function(executor)));
41+
}

tests/common.rs

Lines changed: 80 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -37,42 +37,47 @@ pub fn start_unix_server() -> PathBuf {
3737
std::fs::remove_file(&path).unwrap();
3838
}
3939

40-
let listener = UnixListener::bind(&path).unwrap();
41-
42-
in_tokio(async move {
43-
loop {
44-
let (stream, _) = listener.accept().await.unwrap();
45-
let tokio_io = TokioIo::new(stream);
46-
47-
tokio::task::spawn(async move {
48-
if let Err(err) = http1::Builder::new()
49-
.serve_connection(tokio_io, service_fn(hello_world_route))
50-
.await
51-
{
52-
eprintln!("Error serving connection: {:?}", err);
53-
}
54-
});
55-
}
56-
});
40+
let cloned_path = path.clone();
41+
42+
in_dedicated_thread(
43+
move || UnixListener::bind(&cloned_path).unwrap(),
44+
move |listener| async move {
45+
loop {
46+
let (stream, _) = listener.accept().await.unwrap();
47+
let tokio_io = TokioIo::new(stream);
48+
49+
tokio::task::spawn(async move {
50+
if let Err(err) = http1::Builder::new()
51+
.serve_connection(tokio_io, service_fn(hello_world_route))
52+
.await
53+
{
54+
eprintln!("Error serving connection: {:?}", err);
55+
}
56+
});
57+
}
58+
},
59+
);
5760

5861
path
5962
}
6063

6164
pub fn start_vsock_server() -> (u32, u32) {
6265
let port = rand::thread_rng().gen_range(10000..=65536) as u32;
63-
let mut listener = VsockListener::bind(VsockAddr::new(VMADDR_CID_LOCAL, port)).unwrap();
64-
65-
in_tokio(async move {
66-
loop {
67-
let tokio_io = TokioIo::new(listener.accept().await.unwrap().0);
68-
tokio::task::spawn(async move {
69-
http1::Builder::new()
70-
.serve_connection(tokio_io, service_fn(hello_world_route))
71-
.await
72-
.unwrap();
73-
});
74-
}
75-
});
66+
67+
in_dedicated_thread(
68+
move || VsockListener::bind(VsockAddr::new(VMADDR_CID_LOCAL, port)).unwrap(),
69+
|mut listener| async move {
70+
loop {
71+
let tokio_io = TokioIo::new(listener.accept().await.unwrap().0);
72+
tokio::task::spawn(async move {
73+
http1::Builder::new()
74+
.serve_connection(tokio_io, service_fn(hello_world_route))
75+
.await
76+
.unwrap();
77+
});
78+
}
79+
},
80+
);
7681

7782
(VMADDR_CID_LOCAL, port)
7883
}
@@ -82,45 +87,62 @@ pub fn start_firecracker_server() -> (PathBuf, u32) {
8287
if path.exists() {
8388
std::fs::remove_file(&path).unwrap();
8489
}
90+
let cloned_path = path.clone();
91+
8592
let guest_port = rand::thread_rng().gen_range(1..=1000) as u32;
8693

87-
let listener = UnixListener::bind(&path).unwrap();
88-
89-
in_tokio(async move {
90-
loop {
91-
// Recreate the CONNECT behavior of a real Firecracker socket
92-
let (mut stream, _) = listener.accept().await.unwrap();
93-
let mut buf_reader = BufReader::new(&mut stream).lines();
94-
let mut line = String::new();
95-
buf_reader.get_mut().read_line(&mut line).await.unwrap();
96-
97-
if line == format!("CONNECT {guest_port}\n") {
98-
stream.write_all(b"OK\n").await.unwrap();
99-
} else {
100-
stream.write_all(b"REJECTED\n").await.unwrap();
101-
return;
102-
}
94+
in_dedicated_thread(
95+
move || UnixListener::bind(&cloned_path).unwrap(),
96+
move |listener| async move {
97+
loop {
98+
// Recreate the CONNECT behavior of a real Firecracker socket
99+
let (mut stream, _) = listener.accept().await.unwrap();
100+
let mut buf_reader = BufReader::new(&mut stream).lines();
101+
let mut line = String::new();
102+
buf_reader.get_mut().read_line(&mut line).await.unwrap();
103+
104+
if line == format!("CONNECT {guest_port}\n") {
105+
stream.write_all(b"OK\n").await.unwrap();
106+
} else {
107+
stream.write_all(b"REJECTED\n").await.unwrap();
108+
return;
109+
}
103110

104-
// After sending out approval, serve HTTP
105-
let tokio_io = TokioIo::new(stream);
106-
tokio::task::spawn(async move {
107-
http1::Builder::new()
108-
.serve_connection(tokio_io, service_fn(hello_world_route))
109-
.await
110-
.unwrap();
111-
});
112-
}
113-
});
111+
// After sending out approval, serve HTTP
112+
let tokio_io = TokioIo::new(stream);
113+
tokio::task::spawn(async move {
114+
http1::Builder::new()
115+
.serve_connection(tokio_io, service_fn(hello_world_route))
116+
.await
117+
.unwrap();
118+
});
119+
}
120+
},
121+
);
114122

115123
(path, guest_port)
116124
}
117125

118-
fn in_tokio(future: impl Future<Output = ()> + Send + 'static) {
126+
fn in_dedicated_thread<Listener, MakeListener, Act, ActFuture>(make_listener: MakeListener, act: Act)
127+
where
128+
MakeListener: 'static + Send + FnOnce() -> Listener,
129+
Listener: Send,
130+
Act: 'static + Send + FnOnce(Listener) -> ActFuture,
131+
ActFuture: Future<Output = ()> + Send,
132+
{
133+
let (tx, rx) = std::sync::mpsc::channel();
134+
119135
std::thread::spawn(move || {
120136
tokio::runtime::Builder::new_current_thread()
121137
.enable_all()
122138
.build()
123139
.unwrap()
124-
.block_on(future)
140+
.block_on(async move {
141+
let listener = make_listener();
142+
tx.send(()).unwrap();
143+
act(listener).await;
144+
})
125145
});
146+
147+
rx.recv().unwrap();
126148
}

0 commit comments

Comments
 (0)