Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BOUN-1219): add connection recycling, Vector fix attempt, other minor stuff #29

Merged
merged 12 commits into from
Aug 30, 2024
542 changes: 280 additions & 262 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ candid = "0.10"
chrono = "0.4"
clap = { version = "4.5", features = ["derive", "string", "env"] }
clap_derive = "4.5"
# TODO lz4 feature requires C++ compiler to build due to cityhash, fails with MUSL.
# Fork uses a pure Rust cityhash crate
clickhouse = { git = "https://github.com/dfinity/clickhouse-rs.git", default-features = false, features = [
clickhouse = { version = "0.12.2", features = [
"lz4",
"inserter",
"tls",
"uuid",
"time",
"inserter",
"rustls-tls",
] }
console-subscriber = "0.4"
ctrlc = { version = "3.4", features = ["termination"] }
derive-new = "0.6"
fqdn = "0.3"
Expand All @@ -46,7 +45,7 @@ humantime = "2.1"
hyper-util = "0.1"
ic-agent = { version = "0.37.1", features = ["reqwest"] }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b0" }
itertools = "0.13"
lazy_static = "1.5"
little-loadshedder = "0.2"
Expand Down Expand Up @@ -80,7 +79,7 @@ thiserror = "1.0"
tikv-jemallocator = "0.6"
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
time = { version = "0.3", features = ["macros", "serde"] }
tokio = { version = "1.39", features = ["full"] }
tokio = { version = "1.40", features = ["full", "tracing"] }
tokio-util = { version = "0.7.11", features = ["full"] }
tower = "0.4"
tower_governor = "0.4"
Expand Down Expand Up @@ -115,6 +114,9 @@ codegen-units = 1
lto = "fat"
panic = "abort"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[patch.crates-io]
ic-agent = { package = "ic-agent", git = "https://github.com/dfinity/agent-rs", branch = "dynamic_route", features = [
"reqwest",
Expand Down
26 changes: 23 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ pub struct HttpClient {
#[clap(env, long, default_value = "5s", value_parser = parse_duration)]
pub http_client_timeout_connect: Duration,

/// Timeout for whole HTTP call
/// Timeout for a single read request
#[clap(env, long, default_value = "15s", value_parser = parse_duration)]
pub http_client_timeout_read: Duration,

/// Timeout for the whole HTTP call: this includes connecting, sending request,
/// receiving response etc.
#[clap(env, long, default_value = "60s", value_parser = parse_duration)]
pub http_client_timeout: Duration,

Expand Down Expand Up @@ -129,6 +134,12 @@ pub struct HttpServer {
#[clap(env, long, default_value = "2048")]
pub http_server_backlog: u32,

/// Maximum number of HTTP requests to serve over a single connection.
/// After this number is reached the connection is closed.
/// The default is consistend with nginx's `keepalive_requests` parameter.
#[clap(env, long, default_value = "1000")]
pub http_server_max_requests_per_conn: u64,

/// For how long to wait for the client to send headers
/// Currently applies only to HTTP1 connections.
#[clap(env, long, default_value = "15s", value_parser = parse_duration)]
Expand All @@ -150,8 +161,9 @@ pub struct HttpServer {
#[clap(env, long, default_value = "10s", value_parser = parse_duration)]
pub http_server_http2_keepalive_timeout: Duration,

/// How long to wait for the existing connections to finish before shutting down
#[clap(env, long, default_value = "10s", value_parser = parse_duration)]
/// How long to wait for the existing connections to finish before shutting down.
/// Also applies to the the recycling of connections with `http_server_max_requests_per_conn` option.
#[clap(env, long, default_value = "60s", value_parser = parse_duration)]
pub http_server_grace_period: Duration,

/// Maximum size of cache to store TLS sessions in memory
Expand Down Expand Up @@ -341,6 +353,12 @@ pub struct Log {
#[clap(env, long)]
pub log_null: bool,

/// Enables the Tokio console.
/// It's listening on 127.0.0.1:6669
#[cfg(tokio_unstable)]
#[clap(env, long)]
pub log_tokio_console: bool,

/// Enables logging of HTTP requests to stdout/journald/null.
/// This does not affect Clickhouse/Vector logging targets -
/// if they're enabled they'll log the requests in any case.
Expand Down Expand Up @@ -510,6 +528,7 @@ impl From<&HttpServer> for http::server::Options {
http2_keepalive_timeout: c.http_server_http2_keepalive_timeout,
http2_max_streams: c.http_server_http2_max_streams,
grace_period: c.http_server_grace_period,
max_requests_per_conn: Some(c.http_server_max_requests_per_conn),
}
}
}
Expand All @@ -518,6 +537,7 @@ impl From<&HttpClient> for http::client::Options {
fn from(c: &HttpClient) -> Self {
Self {
timeout_connect: c.http_client_timeout_connect,
timeout_read: c.http_client_timeout_read,
timeout: c.http_client_timeout,
tcp_keepalive: Some(c.http_client_tcp_keepalive),
http2_keepalive: Some(c.http_client_http2_keepalive),
Expand Down
19 changes: 10 additions & 9 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
} else {
None
};
let vector = cli
.log
.vector
.log_vector_url
.as_ref()
.map(|_| Arc::new(metrics::Vector::new(&cli.log.vector, http_client.clone())));
let vector = cli.log.vector.log_vector_url.as_ref().map(|_| {
Arc::new(metrics::Vector::new(
&cli.log.vector,
http_client.clone(),
&registry,
))
});

// List of cancellable tasks to execute & track
let mut tasks = TaskManager::new();
Expand Down Expand Up @@ -124,7 +125,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {

// Set up HTTP
let http_server = Arc::new(http::Server::new(
http::server::LocalAddr::Tcp(cli.http_server.http_server_listen_plain),
http::server::Addr::Tcp(cli.http_server.http_server_listen_plain),
http_router,
(&cli.http_server).into(),
http_metrics.clone(),
Expand All @@ -133,7 +134,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
tasks.add("http_server", http_server);

let https_server = Arc::new(http::Server::new(
http::server::LocalAddr::Tcp(cli.http_server.http_server_listen_tls),
http::server::Addr::Tcp(cli.http_server.http_server_listen_tls),
https_router,
(&cli.http_server).into(),
http_metrics.clone(),
Expand All @@ -146,7 +147,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
let router = metrics::setup(&registry, tls_session_cache, &mut tasks);

let srv = Arc::new(http::Server::new(
http::server::LocalAddr::Tcp(addr),
http::server::Addr::Tcp(addr),
router,
(&cli.http_server).into(),
http_metrics,
Expand Down
10 changes: 10 additions & 0 deletions src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ pub fn setup_logging(cli: &Log) -> Result<(), Error> {
None
};

#[cfg(tokio_unstable)]
let tokio_console_layer = if cli.log_tokio_console {
Some(console_subscriber::spawn())
} else {
None
};

let subscriber = Registry::default()
// Journald
.with(journald_layer)
Expand All @@ -142,5 +149,8 @@ pub fn setup_logging(cli: &Log) -> Result<(), Error> {
.with_filter(level_filter)
}));

#[cfg(tokio_unstable)]
let subscriber = subscriber.with(tokio_console_layer);

tracing::subscriber::set_global_default(subscriber).context("unable to set global subscriber")
}
Loading