Skip to content

Commit

Permalink
Merge pull request #233 from bearcove/tls-cleanup
Browse files Browse the repository at this point in the history
Some perfstat.sh cleanups
  • Loading branch information
fasterthanlime authored Aug 30, 2024
2 parents 26be2fe + 63d2961 commit 1d42579
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 121 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ httpwg-over-tcp *args='':
cargo build --release \
--package httpwg-loona \
--package httpwg-cli
export PROTO=h2
export PROTO=h2c
export PORT=8001
export RUST_LOG=${RUST_LOG:-info}
./target/release/httpwg --frame-timeout 2000 --connect-timeout 2000 --address localhost:8001 "$@" -- ./target/release/httpwg-loona
Expand All @@ -84,7 +84,7 @@ samply:
--package httpwg-loona \
--profile profiling \
--features tracing/release_max_level_info
export PROTO=h2
export PROTO=h2c
export PORT=8002
target/profiling/httpwg-loona

Expand Down
12 changes: 12 additions & 0 deletions crates/httpwg-harness/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ impl Settings {
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Ok(server_config)
}

pub fn message_for_404() -> &'static str {
r#"404 Not Found
This server serves the following routes:
/echo-body — Echoes back the request body.
/status/{code} — Returns a response with the specified status code.
/repeat-4k-blocks/{repeat} — Streams the specified number of 4KB blocks (from memory)
/stream-file/{name} — Streams the contents of a file from `/tmp/stream-file/{name}` — see `scripts/mkfiles.sh`
/"#
}
}

/// A sample block of 4KiB of data.
Expand Down
36 changes: 20 additions & 16 deletions crates/httpwg-hyper/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! - Any other path: Returns a 404 Not Found response.

use http_body_util::{BodyExt, StreamBody};
use httpwg_harness::SAMPLE_4K_BLOCK;
use httpwg_harness::{Settings, SAMPLE_4K_BLOCK};
use tokio::io::AsyncReadExt;

use std::{convert::Infallible, fmt::Debug, pin::Pin};
Expand Down Expand Up @@ -46,23 +46,24 @@ where
let path = parts.uri.path();
let parts = path.trim_start_matches('/').split('/').collect::<Vec<_>>();

if let ["echo-body"] = parts.as_slice() {
let body: BoxBody<E> = Box::pin(req_body);
let res = Response::builder().body(body).unwrap();
Ok(res)
} else {
let body: BoxBody<E> =
Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!()));

if let ["status", code] = parts.as_slice() {
match parts.as_slice() {
["echo-body"] => {
let body: BoxBody<E> = Box::pin(req_body);
let res = Response::builder().body(body).unwrap();
Ok(res)
}
["status", code] => {
// drain body
while let Some(_frame) = req_body.frame().await {}

let code = code.parse::<u16>().unwrap();
let body: BoxBody<E> =
Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!()));
let res = Response::builder().status(code).body(body).unwrap();
debug!("Replying with {:?} {:?}", res.status(), res.headers());
Ok(res)
} else if let ["repeat-4k-blocks", repeat] = parts.as_slice() {
}
["repeat-4k-blocks", repeat] => {
// drain body
while let Some(_frame) = req_body.frame().await {}

Expand All @@ -83,7 +84,8 @@ where
let body: BoxBody<E> = Box::pin(StreamBody::new(rx));
let res = Response::builder().body(body).unwrap();
Ok(res)
} else if let ["stream-file", name] = parts.as_slice() {
}
["stream-file", name] => {
// drain body
while let Some(_frame) = req_body.frame().await {}

Expand All @@ -110,20 +112,22 @@ where
let body: BoxBody<E> = Box::pin(StreamBody::new(rx));
let res = Response::builder().body(body).unwrap();
Ok(res)
} else if parts.as_slice().is_empty() {
}
[""] => {
// drain body
while let Some(_frame) = req_body.frame().await {}

let body = "it's less dire to lose, than to lose oneself".to_string();
let body = "See /help for a list of routes".to_string();
let body: BoxBody<E> = Box::pin(body.map_err(|_| unreachable!()));
let res = Response::builder().status(200).body(body).unwrap();
Ok(res)
} else {
}
_ => {
// drain body
while let Some(_frame) = req_body.frame().await {}

// return a 404
let body = "404 Not Found".to_string();
let body = Settings::message_for_404().to_string();
let body: BoxBody<E> = Box::pin(body.map_err(|_| unreachable!()));
let res = Response::builder().status(404).body(body).unwrap();
Ok(res)
Expand Down
11 changes: 6 additions & 5 deletions crates/httpwg-loona/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ tracing-subscriber = "0.3.18"
tokio = { version = "1.39.2", features = ["macros", "sync", "process"] }
eyre = { version = "0.6.12", default-features = false }
b-x = { version = "1.0.0", path = "../b-x" }
rcgen = { version = "0.13.1", default-features = false, features = ["aws_lc_rs"] }
tokio-rustls = "0.26.0"
ktls = "6.0.0"
rcgen = { version = "0.13.1", default-features = false, features = [
"aws_lc_rs",
] }
httpwg-harness = { version = "0.1.0", path = "../httpwg-harness" }
socket2 = "0.5.7"

[dev-dependencies]
codspeed-criterion-compat = "2.6.0"
[target.'cfg(target_os = "linux")'.dependencies]
ktls = "6.0.0"
tokio-rustls = "0.26.0"
19 changes: 17 additions & 2 deletions crates/httpwg-loona/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use b_x::{BxForResults, BX};
use httpwg_harness::SAMPLE_4K_BLOCK;
use httpwg_harness::{Settings, SAMPLE_4K_BLOCK};

use buffet::Piece;
use loona::{
Expand Down Expand Up @@ -93,6 +93,21 @@ where
.await
.bx()?
}
// apparently `/` gives us that
[""] => {
drain_body(req_body).await?;

let body = "See /help for a list of routes";
res.write_final_response_with_body(
Response {
status: StatusCode::OK,
..Default::default()
},
&mut SinglePieceBody::from(body),
)
.await
.bx()?
}
_ => {
drain_body(req_body).await?;

Expand All @@ -102,7 +117,7 @@ where
status: StatusCode::NOT_FOUND,
..Default::default()
},
&mut SinglePieceBody::from("404 Not Found"),
&mut SinglePieceBody::from(Settings::message_for_404()),
)
.await
.bx()?
Expand Down
96 changes: 17 additions & 79 deletions crates/httpwg-loona/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
use driver::TestDriver;
use httpwg_harness::{Proto, Settings};
use ktls::CorkStream;
use std::{
mem::ManuallyDrop,
os::fd::{AsRawFd, FromRawFd, IntoRawFd},
rc::Rc,
sync::Arc,
};
use tokio_rustls::TlsAcceptor;

use buffet::{
net::{TcpListener, TcpStream},
IntoHalves, RollMut,
};
use loona::{
error::ServeError,
h1,
h2::{self, types::H2ConnectionError},
};
use httpwg_harness::Proto;
use httpwg_harness::Settings;
use std::rc::Rc;

use buffet::net::TcpListener;
use buffet::IntoHalves;
use buffet::RollMut;
use loona::error::ServeError;
use loona::h1;
use loona::h2;
use loona::h2::types::H2ConnectionError;
use tracing::Level;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, util::SubscriberInitExt};

mod driver;

#[cfg(target_os = "linux")]
mod tls;

fn main() {
setup_tracing_and_error_reporting();
buffet::start(real_main());
Expand Down Expand Up @@ -87,48 +82,9 @@ async fn real_main() {

#[cfg(target_os = "linux")]
Proto::TLS => {
let mut server_config = Settings::gen_rustls_server_config().unwrap();
server_config.enable_secret_extraction = true;
let driver = TestDriver;
let h1_conf = Rc::new(h1::ServerConf::default());
let h2_conf = Rc::new(h2::ServerConf::default());

// until we come up with `loona-rustls`, we need to temporarily go through a
// tokio TcpStream
let acceptor = TlsAcceptor::from(Arc::new(server_config));
let stream = unsafe { std::net::TcpStream::from_raw_fd(stream.into_raw_fd()) };
stream.set_nonblocking(true).unwrap();
let stream = tokio::net::TcpStream::from_std(stream)?;
let stream = CorkStream::new(stream);
let stream = acceptor.accept(stream).await?;

let is_h2 = matches!(stream.get_ref().1.alpn_protocol(), Some(b"h2"));
tracing::debug!(%is_h2, "Performed TLS handshake");

let stream = ktls::config_ktls_server(stream).await?;

tracing::debug!("Set up kTLS");
let (drained, stream) = stream.into_raw();
let drained = drained.unwrap_or_default();
tracing::debug!("{} bytes already decoded by rustls", drained.len());

// and back to a buffet TcpStream
let stream = stream.to_uring_tcp_stream()?;

let mut client_buf = RollMut::alloc()?;
client_buf.put(&drained[..])?;

if is_h2 {
tracing::info!("Using HTTP/2");
h2::serve(stream.into_halves(), h2_conf, client_buf, Rc::new(driver))
.await
.map_err(|e| eyre::eyre!("h2 server error: {e:?}"))?;
} else {
tracing::info!("Using HTTP/1.1");
h1::serve(stream.into_halves(), h1_conf, client_buf, driver)
.await
.map_err(|e| eyre::eyre!("h1 server error: {e:?}"))?;
}
tls::handle_tls_conn(stream)
.await
.map_err(|e| eyre::eyre!("tls error: {e:?}"))?;
}
}
Ok::<_, eyre::Report>(())
Expand Down Expand Up @@ -164,21 +120,3 @@ fn setup_tracing_and_error_reporting() {
.with(fmt_layer)
.init();
}

pub trait ToUringTcpStream {
fn to_uring_tcp_stream(self) -> std::io::Result<TcpStream>;
}

impl ToUringTcpStream for tokio::net::TcpStream {
fn to_uring_tcp_stream(self) -> std::io::Result<TcpStream> {
{
let sock = ManuallyDrop::new(unsafe { socket2::Socket::from_raw_fd(self.as_raw_fd()) });
// tokio needs the socket to be "non-blocking" (as in: return EAGAIN)
// buffet needs it to be "blocking" (as in: let io_uring do the op async)
sock.set_nonblocking(false)?;
}
let stream = unsafe { TcpStream::from_raw_fd(self.as_raw_fd()) };
std::mem::forget(self);
Ok(stream)
}
}
78 changes: 78 additions & 0 deletions crates/httpwg-loona/src/tls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use b_x::BxForResults;
use buffet::net::TcpStream;
use buffet::IntoHalves;
use buffet::RollMut;
use httpwg_harness::Settings;
use ktls::CorkStream;
use loona::h1;
use loona::h2;
use std::mem::ManuallyDrop;
use std::os::fd::AsRawFd;
use std::os::fd::FromRawFd;
use std::os::fd::IntoRawFd;
use std::rc::Rc;
use std::sync::Arc;
use tokio_rustls::TlsAcceptor;

use crate::driver::TestDriver;

pub(super) async fn handle_tls_conn(stream: TcpStream) -> b_x::Result<()> {
let mut server_config = Settings::gen_rustls_server_config().unwrap();
server_config.enable_secret_extraction = true;
let driver = TestDriver;
let h1_conf = Rc::new(h1::ServerConf::default());
let h2_conf = Rc::new(h2::ServerConf::default());

// until we come up with `loona-rustls`, we need to temporarily go through a
// tokio TcpStream
let acceptor = TlsAcceptor::from(Arc::new(server_config));
let stream = unsafe { std::net::TcpStream::from_raw_fd(stream.into_raw_fd()) };
stream.set_nonblocking(true).unwrap();
let stream = tokio::net::TcpStream::from_std(stream)?;
let stream = CorkStream::new(stream);
let stream = acceptor.accept(stream).await?;

let is_h2 = matches!(stream.get_ref().1.alpn_protocol(), Some(b"h2"));
tracing::debug!(%is_h2, "Performed TLS handshake");

let stream = ktls::config_ktls_server(stream).await.bx()?;

tracing::debug!("Set up kTLS");
let (drained, stream) = stream.into_raw();
let drained = drained.unwrap_or_default();
tracing::debug!("{} bytes already decoded by rustls", drained.len());

// and back to a buffet TcpStream
let stream = stream.to_uring_tcp_stream()?;

let mut client_buf = RollMut::alloc()?;
client_buf.put(&drained[..])?;

if is_h2 {
tracing::info!("Using HTTP/2");
h2::serve(stream.into_halves(), h2_conf, client_buf, Rc::new(driver)).await?;
} else {
tracing::info!("Using HTTP/1.1");
h1::serve(stream.into_halves(), h1_conf, client_buf, driver).await?;
}
Ok(())
}

pub trait ToUringTcpStream {
fn to_uring_tcp_stream(self) -> std::io::Result<TcpStream>;
}

impl ToUringTcpStream for tokio::net::TcpStream {
fn to_uring_tcp_stream(self) -> std::io::Result<TcpStream> {
{
let sock = ManuallyDrop::new(unsafe { socket2::Socket::from_raw_fd(self.as_raw_fd()) });
// tokio needs the socket to be "non-blocking" (as in: return
// EAGAIN) buffet needs it to be
// "blocking" (as in: let io_uring do the op async)
sock.set_nonblocking(false)?;
}
let stream = unsafe { TcpStream::from_raw_fd(self.as_raw_fd()) };
std::mem::forget(self);
Ok(stream)
}
}
Loading

0 comments on commit 1d42579

Please sign in to comment.