Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
390 changes: 365 additions & 25 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hyper = { version = "1.6.0", default-features = false }
hyper-util = { version = "0.1.16", default-features = false }
libc = { version = "0.2.159", default-features = false }
log = { version = "0.4.22", default-features = false }
pprof = { version = "0.15.0", features = ["prost-codec"] }
prometheus-client = { version = "0.24.0", default-features = false }
prost = "0.13.5"
prost-types = "0.13.5"
Expand Down
1 change: 1 addition & 0 deletions fact/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ log = { workspace = true }
tonic = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
pprof = { workspace = true }
prometheus-client = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand Down
26 changes: 26 additions & 0 deletions fact/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct FactConfig {
paths: Option<Vec<PathBuf>>,
url: Option<String>,
certs: Option<PathBuf>,
expose_profiler: Option<bool>,
expose_metrics: Option<bool>,
health_check: Option<bool>,
skip_pre_flight: Option<bool>,
Expand Down Expand Up @@ -70,6 +71,10 @@ impl FactConfig {
self.certs = Some(certs.to_owned());
}

if let Some(expose_profiler) = from.expose_profiler {
self.expose_profiler = Some(expose_profiler);
}

if let Some(expose_metrics) = from.expose_metrics {
self.expose_metrics = Some(expose_metrics);
}
Expand Down Expand Up @@ -103,6 +108,10 @@ impl FactConfig {
self.certs.as_deref()
}

pub fn expose_profiler(&self) -> bool {
self.expose_profiler.unwrap_or(false)
}

pub fn expose_metrics(&self) -> bool {
self.expose_metrics.unwrap_or(false)
}
Expand Down Expand Up @@ -195,6 +204,12 @@ impl TryFrom<Vec<Yaml>> for FactConfig {
};
config.certs = Some(PathBuf::from(certs));
}
"expose_profiler" => {
let Some(em) = v.as_bool() else {
bail!("expose_profiler field has incorrect type: {v:?}");
};
config.expose_profiler = Some(em);
}
"expose_metrics" => {
let Some(em) = v.as_bool() else {
bail!("expose_metrics field has incorrect type: {v:?}");
Expand Down Expand Up @@ -255,6 +270,16 @@ pub struct FactCli {
#[arg(short, long, env = "FACT_CERTS")]
certs: Option<PathBuf>,

/// Whether pprof profiler should be exposed
#[arg(
long,
overrides_with("no_expose_profiler"),
env = "FACT_EXPOSE_PROFILER"
)]
expose_profiler: bool,
#[arg(long, overrides_with = "expose_profiler", hide(true))]
no_expose_profiler: bool,

/// Whether prometheus metrics should be collected and exposed
#[arg(long, overrides_with("no_expose_metrics"), env = "FACT_EXPOSE_METRICS")]
expose_metrics: bool,
Expand Down Expand Up @@ -301,6 +326,7 @@ impl FactCli {
paths: self.paths.clone(),
url: self.url.clone(),
certs: self.certs.clone(),
expose_profiler: resolve_bool_arg(self.expose_profiler, self.no_expose_profiler),
expose_metrics: resolve_bool_arg(self.expose_metrics, self.no_expose_metrics),
health_check: resolve_bool_arg(self.health_check, self.no_health_check),
skip_pre_flight: resolve_bool_arg(self.skip_pre_flight, self.no_skip_pre_flight),
Expand Down
54 changes: 54 additions & 0 deletions fact/src/config/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ fn parsing() {
..Default::default()
},
),
(
"expose_profiler: true",
FactConfig {
expose_profiler: Some(true),
..Default::default()
},
),
(
"expose_profiler: false",
FactConfig {
expose_profiler: Some(false),
..Default::default()
},
),
(
"expose_metrics: true",
FactConfig {
Expand Down Expand Up @@ -101,6 +115,7 @@ fn parsing() {
- /etc
url: https://svc.sensor.stackrox:9090
certs: /etc/stackrox/certs
expose_profiler: true
expose_metrics: true
health_check: true
skip_pre_flight: false
Expand All @@ -111,6 +126,7 @@ fn parsing() {
paths: Some(vec![PathBuf::from("/etc")]),
url: Some(String::from("https://svc.sensor.stackrox:9090")),
certs: Some(PathBuf::from("/etc/stackrox/certs")),
expose_profiler: Some(true),
expose_metrics: Some(true),
health_check: Some(true),
skip_pre_flight: Some(false),
Expand Down Expand Up @@ -151,6 +167,10 @@ paths:
"certs: true",
"certs field has incorrect type: Boolean(true)",
),
(
"expose_profiler: 4",
"expose_profiler field has incorrect type: Integer(4)",
),
(
"expose_metrics: 4",
"expose_metrics field has incorrect type: Integer(4)",
Expand Down Expand Up @@ -298,6 +318,36 @@ fn update() {
..Default::default()
},
),
(
"expose_profiler: true",
FactConfig::default(),
FactConfig {
expose_profiler: Some(true),
..Default::default()
},
),
(
"expose_profiler: true",
FactConfig {
expose_profiler: Some(false),
..Default::default()
},
FactConfig {
expose_profiler: Some(true),
..Default::default()
},
),
(
"expose_profiler: true",
FactConfig {
expose_profiler: Some(true),
..Default::default()
},
FactConfig {
expose_profiler: Some(true),
..Default::default()
},
),
(
"expose_metrics: true",
FactConfig::default(),
Expand Down Expand Up @@ -424,6 +474,7 @@ fn update() {
- /etc
url: https://svc.sensor.stackrox:9090
certs: /etc/stackrox/certs
expose_profiler: true
expose_metrics: true
health_check: true
skip_pre_flight: false
Expand All @@ -434,6 +485,7 @@ fn update() {
paths: Some(vec![PathBuf::from("/etc"), PathBuf::from("/bin")]),
url: Some(String::from("http://localhost")),
certs: Some(PathBuf::from("/etc/certs")),
expose_profiler: Some(false),
expose_metrics: Some(false),
health_check: Some(false),
skip_pre_flight: Some(true),
Expand All @@ -444,6 +496,7 @@ fn update() {
paths: Some(vec![PathBuf::from("/etc")]),
url: Some(String::from("https://svc.sensor.stackrox:9090")),
certs: Some(PathBuf::from("/etc/stackrox/certs")),
expose_profiler: Some(true),
expose_metrics: Some(true),
health_check: Some(true),
skip_pre_flight: Some(false),
Expand All @@ -469,6 +522,7 @@ fn defaults() {
assert_eq!(config.paths(), default_paths);
assert_eq!(config.url(), None);
assert_eq!(config.certs(), None);
assert!(!config.expose_profiler());
assert!(!config.expose_metrics());
assert!(!config.health_check());
assert!(!config.skip_pre_flight());
Expand Down
140 changes: 118 additions & 22 deletions fact/src/endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{future::Future, net::SocketAddr, pin::Pin};

use http_body_util::Full;
use http_body_util::{BodyExt, Full};
use hyper::{
body::{Bytes, Incoming},
server::conn::http1,
Expand All @@ -12,25 +12,41 @@ use log::{info, warn};
use tokio::{net::TcpListener, sync::watch, task::JoinHandle};

use crate::metrics::exporter::Exporter;
use crate::profiler::Profiler;

type ServerResponse = anyhow::Result<Response<Full<Bytes>>>;

#[derive(Clone)]
pub struct Server {
metrics: Option<Exporter>,
health_check: bool,
profiler: Option<Profiler>,
}

impl Server {
pub fn new(metrics: Exporter, expose_metrics: bool, health_check: bool) -> Self {
pub fn new(
metrics: Exporter,
expose_profiler: bool,
expose_metrics: bool,
health_check: bool,
) -> Self {
let metrics = if expose_metrics { Some(metrics) } else { None };
let profiler = if expose_profiler {
Some(Profiler::new())
} else {
None
};

Server {
metrics,
health_check,
profiler,
}
}

pub fn start(self, mut running: watch::Receiver<bool>) -> Option<JoinHandle<()>> {
// If there is nothing to expose, we don't run the hyper server
if self.metrics.is_none() && !self.health_check {
if self.metrics.is_none() && self.profiler.is_none() && !self.health_check {
return None;
}

Expand Down Expand Up @@ -63,39 +79,116 @@ impl Server {
Some(handle)
}

fn make_response(
fn response(res: StatusCode, body: impl Into<Bytes>) -> ServerResponse {
Response::builder()
.status(res)
.body(Full::new(body.into()))
.map_err(anyhow::Error::new)
}

fn response_with_content_type(
res: StatusCode,
body: String,
) -> Result<Response<Full<Bytes>>, anyhow::Error> {
Ok(Response::builder()
content_type: &str,
body: impl Into<Bytes>,
) -> ServerResponse {
Response::builder()
.status(res)
.body(Full::new(Bytes::from(body)))
.unwrap())
.header(hyper::header::CONTENT_TYPE, content_type)
.body(Full::new(body.into()))
.map_err(anyhow::Error::new)
}

fn handle_metrics(&self) -> Result<Response<Full<Bytes>>, anyhow::Error> {
fn handle_metrics(&self) -> ServerResponse {
match &self.metrics {
Some(metrics) => metrics.encode().map(|buf| {
let body = Full::new(Bytes::from(buf));
Response::builder()
.header(
hyper::header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(body)
.map_err(anyhow::Error::new)
Server::response_with_content_type(
StatusCode::OK,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
buf,
)
})?,
None => Server::make_response(StatusCode::SERVICE_UNAVAILABLE, String::new()),
None => Server::response(StatusCode::SERVICE_UNAVAILABLE, ""),
}
}

fn handle_health_check(&self) -> Result<Response<Full<Bytes>>, anyhow::Error> {
fn handle_health_check(&self) -> ServerResponse {
let res = if self.health_check {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Server::make_response(res, String::new())
Server::response(res, "")
}

async fn handle_profiler_status(&self) -> ServerResponse {
let Some(profiler) = &self.profiler else {
return Server::response(StatusCode::INTERNAL_SERVER_ERROR, "Profiler is not enabled");
};
let body = profiler.get_status().await;
Server::response_with_content_type(StatusCode::OK, "application/json", body)
}

async fn handle_cpu_profiler(&self, body: Incoming) -> ServerResponse {
let Some(profiler) = &self.profiler else {
return Server::response(StatusCode::INTERNAL_SERVER_ERROR, "Profiler is not enabled");
};

let body = match body.collect().await {
Ok(b) => b.to_bytes(),
Err(e) => {
return Server::response(
StatusCode::BAD_REQUEST,
format!("Failed to read request body: {e}"),
)
}
};

if body == "on" {
match profiler.start().await {
Ok(_) => Server::response_with_content_type(
StatusCode::OK,
"text/plain",
"CPU profiler starter",
),
Err(e) => Server::response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to start CPU profiler: {e}"),
),
}
} else if body == "off" {
match profiler.stop().await {
Ok(_) => Server::response_with_content_type(
StatusCode::OK,
"text/plain",
"CPU profiler stopped",
),
Err(e) => Server::response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to stop CPU profiler: {e}"),
),
}
} else {
Server::response(
StatusCode::BAD_REQUEST,
format!("Invalid request body: {body:?}"),
)
}
}

async fn handle_cpu_report(&self) -> ServerResponse {
let Some(profiler) = &self.profiler else {
return Server::response(StatusCode::INTERNAL_SERVER_ERROR, "Profiler is not enabled");
};

match profiler.get().await {
Ok(profile) => {
Server::response_with_content_type(StatusCode::OK, "text/plain", profile)
}
Err(e) => Server::response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to get CPU profile: {e}"),
),
}
}
}

Expand All @@ -110,7 +203,10 @@ impl Service<Request<Incoming>> for Server {
match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => s.handle_metrics(),
(&Method::GET, "/health_check") => s.handle_health_check(),
_ => Server::make_response(StatusCode::NOT_FOUND, String::new()),
(&Method::POST, "/profile/cpu") => s.handle_cpu_profiler(req.into_body()).await,
(&Method::GET, "/profile/cpu") => s.handle_cpu_report().await,
(&Method::GET, "/profile") => s.handle_profiler_status().await,
_ => Server::response(StatusCode::NOT_FOUND, ""),
}
})
}
Expand Down
Loading
Loading