Skip to content

Commit

Permalink
Merge pull request #29 from dfinity/igornovg/cache-metrics
Browse files Browse the repository at this point in the history
feat(BOUN-1219): add connection recycling, Vector fix attempt, other minor stuff
  • Loading branch information
blind-oracle authored Aug 30, 2024
2 parents 5ccbd70 + 30878f5 commit 9bf252e
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 320 deletions.
542 changes: 280 additions & 262 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ candid = "0.10"
chrono = "0.4"
clap = { version = "4.5", features = ["derive", "string", "env"] }
clap_derive = "4.5"
# TODO lz4 feature requires C++ compiler to build due to cityhash, fails with MUSL.
# Fork uses a pure Rust cityhash crate
clickhouse = { git = "https://github.com/dfinity/clickhouse-rs.git", default-features = false, features = [
clickhouse = { version = "0.12.2", features = [
"lz4",
"inserter",
"tls",
"uuid",
"time",
"inserter",
"rustls-tls",
] }
console-subscriber = "0.4"
ctrlc = { version = "3.4", features = ["termination"] }
derive-new = "0.6"
fqdn = "0.3"
Expand All @@ -46,7 +45,7 @@ humantime = "2.1"
hyper-util = "0.1"
ic-agent = { version = "0.37.1", features = ["reqwest"] }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b0" }
itertools = "0.13"
lazy_static = "1.5"
little-loadshedder = "0.2"
Expand Down Expand Up @@ -80,7 +79,7 @@ thiserror = "1.0"
tikv-jemallocator = "0.6"
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
time = { version = "0.3", features = ["macros", "serde"] }
tokio = { version = "1.39", features = ["full"] }
tokio = { version = "1.40", features = ["full", "tracing"] }
tokio-util = { version = "0.7.11", features = ["full"] }
tower = "0.4"
tower_governor = "0.4"
Expand Down Expand Up @@ -115,6 +114,9 @@ codegen-units = 1
lto = "fat"
panic = "abort"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[patch.crates-io]
ic-agent = { package = "ic-agent", git = "https://github.com/dfinity/agent-rs", branch = "dynamic_route", features = [
"reqwest",
Expand Down
26 changes: 23 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ pub struct HttpClient {
#[clap(env, long, default_value = "5s", value_parser = parse_duration)]
pub http_client_timeout_connect: Duration,

/// Timeout for whole HTTP call
/// Timeout for a single read request
#[clap(env, long, default_value = "15s", value_parser = parse_duration)]
pub http_client_timeout_read: Duration,

/// Timeout for the whole HTTP call: this includes connecting, sending request,
/// receiving response etc.
#[clap(env, long, default_value = "60s", value_parser = parse_duration)]
pub http_client_timeout: Duration,

Expand Down Expand Up @@ -129,6 +134,12 @@ pub struct HttpServer {
#[clap(env, long, default_value = "2048")]
pub http_server_backlog: u32,

/// Maximum number of HTTP requests to serve over a single connection.
/// After this number is reached the connection is gracefully closed.
/// The default is consistent with nginx's `keepalive_requests` parameter.
#[clap(env, long, default_value = "1000")]
pub http_server_max_requests_per_conn: u64,

/// For how long to wait for the client to send headers
/// Currently applies only to HTTP1 connections.
#[clap(env, long, default_value = "15s", value_parser = parse_duration)]
Expand All @@ -150,8 +161,9 @@ pub struct HttpServer {
#[clap(env, long, default_value = "10s", value_parser = parse_duration)]
pub http_server_http2_keepalive_timeout: Duration,

/// How long to wait for the existing connections to finish before shutting down
#[clap(env, long, default_value = "10s", value_parser = parse_duration)]
/// How long to wait for the existing connections to finish before shutting down.
/// Also applies to the recycling of connections with `http_server_max_requests_per_conn` option.
#[clap(env, long, default_value = "60s", value_parser = parse_duration)]
pub http_server_grace_period: Duration,

/// Maximum size of cache to store TLS sessions in memory
Expand Down Expand Up @@ -341,6 +353,12 @@ pub struct Log {
#[clap(env, long)]
pub log_null: bool,

/// Enables the Tokio console.
/// It's listening on 127.0.0.1:6669
#[cfg(tokio_unstable)]
#[clap(env, long)]
pub log_tokio_console: bool,

/// Enables logging of HTTP requests to stdout/journald/null.
/// This does not affect Clickhouse/Vector logging targets -
/// if they're enabled they'll log the requests in any case.
Expand Down Expand Up @@ -510,6 +528,7 @@ impl From<&HttpServer> for http::server::Options {
http2_keepalive_timeout: c.http_server_http2_keepalive_timeout,
http2_max_streams: c.http_server_http2_max_streams,
grace_period: c.http_server_grace_period,
max_requests_per_conn: Some(c.http_server_max_requests_per_conn),
}
}
}
Expand All @@ -518,6 +537,7 @@ impl From<&HttpClient> for http::client::Options {
fn from(c: &HttpClient) -> Self {
Self {
timeout_connect: c.http_client_timeout_connect,
timeout_read: c.http_client_timeout_read,
timeout: c.http_client_timeout,
tcp_keepalive: Some(c.http_client_tcp_keepalive),
http2_keepalive: Some(c.http_client_http2_keepalive),
Expand Down
19 changes: 10 additions & 9 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
} else {
None
};
let vector = cli
.log
.vector
.log_vector_url
.as_ref()
.map(|_| Arc::new(metrics::Vector::new(&cli.log.vector, http_client.clone())));
let vector = cli.log.vector.log_vector_url.as_ref().map(|_| {
Arc::new(metrics::Vector::new(
&cli.log.vector,
http_client.clone(),
&registry,
))
});

// List of cancellable tasks to execute & track
let mut tasks = TaskManager::new();
Expand Down Expand Up @@ -124,7 +125,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {

// Set up HTTP
let http_server = Arc::new(http::Server::new(
http::server::LocalAddr::Tcp(cli.http_server.http_server_listen_plain),
http::server::Addr::Tcp(cli.http_server.http_server_listen_plain),
http_router,
(&cli.http_server).into(),
http_metrics.clone(),
Expand All @@ -133,7 +134,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
tasks.add("http_server", http_server);

let https_server = Arc::new(http::Server::new(
http::server::LocalAddr::Tcp(cli.http_server.http_server_listen_tls),
http::server::Addr::Tcp(cli.http_server.http_server_listen_tls),
https_router,
(&cli.http_server).into(),
http_metrics.clone(),
Expand All @@ -146,7 +147,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
let router = metrics::setup(&registry, tls_session_cache, &mut tasks);

let srv = Arc::new(http::Server::new(
http::server::LocalAddr::Tcp(addr),
http::server::Addr::Tcp(addr),
router,
(&cli.http_server).into(),
http_metrics,
Expand Down
10 changes: 10 additions & 0 deletions src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ pub fn setup_logging(cli: &Log) -> Result<(), Error> {
None
};

#[cfg(tokio_unstable)]
let tokio_console_layer = if cli.log_tokio_console {
Some(console_subscriber::spawn())
} else {
None
};

let subscriber = Registry::default()
// Journald
.with(journald_layer)
Expand All @@ -142,5 +149,8 @@ pub fn setup_logging(cli: &Log) -> Result<(), Error> {
.with_filter(level_filter)
}));

#[cfg(tokio_unstable)]
let subscriber = subscriber.with(tokio_console_layer);

tracing::subscriber::set_global_default(subscriber).context("unable to set global subscriber")
}
Loading

0 comments on commit 9bf252e

Please sign in to comment.