Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ log = { version = "0.4.22", default-features = false }
prometheus-client = { version = "0.24.0", default-features = false }
prost = "0.13.5"
prost-types = "0.13.5"
serde = { version = "1.0.219", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive", "rc"] }
serde_json = "1.0.142"
tokio = { version = "1.40.0", default-features = false, features = [
"macros",
Expand Down
1 change: 0 additions & 1 deletion fact-ebpf/src/bpf/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
struct helper_t {
char buf[PATH_MAX * 2];
const unsigned char* array[16];
};

struct {
Expand Down
71 changes: 1 addition & 70 deletions fact-ebpf/src/bpf/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,71 +11,6 @@
#include <bpf/bpf_core_read.h>
// clang-format on

__always_inline static const char* get_memory_cgroup(struct helper_t* helper) {
if (!bpf_core_enum_value_exists(enum cgroup_subsys_id, memory_cgrp_id)) {
return NULL;
}

struct task_struct* task = (struct task_struct*)bpf_get_current_task();

// We're guessing which cgroup controllers are enabled for this task. The
// assumption is that memory controller is present more often than
// cpu & cpuacct.
struct kernfs_node* kn = BPF_CORE_READ(task, cgroups, subsys[memory_cgrp_id], cgroup, kn);
if (kn == NULL) {
return NULL;
}

int i = 0;
for (; i < 16; i++) {
helper->array[i] = (const unsigned char*)BPF_CORE_READ(kn, name);
if (bpf_core_field_exists(kn->__parent)) {
kn = BPF_CORE_READ(kn, __parent);
} else {
struct {
struct kernfs_node* parent;
}* kn_old = (void*)kn;
kn = BPF_CORE_READ(kn_old, parent);
}
if (kn == NULL) {
break;
}
}

if (i == 16) {
i--;
}

int offset = 0;
for (; i >= 0 && offset < PATH_MAX; i--) {
// Skip empty directories
if (helper->array[i] == NULL) {
continue;
}

helper->buf[offset & (PATH_MAX - 1)] = '/';
if (++offset >= PATH_MAX) {
return NULL;
}

int len = bpf_probe_read_kernel_str(&helper->buf[offset & (PATH_MAX - 1)], PATH_MAX, helper->array[i]);
if (len < 0) {
// We should have skipped all empty entries, any other error is a genuine
// problem, stop processing.
return NULL;
}

if (len == 1) {
offset--;
continue;
}

offset += len - 1;
}

return helper->buf;
}

__always_inline static void process_fill_lineage(process_t* p, struct helper_t* helper) {
struct task_struct* task = (struct task_struct*)bpf_get_current_task();
struct path path;
Expand Down Expand Up @@ -112,6 +47,7 @@ __always_inline static int64_t process_fill(process_t* p) {
p->gid = (uid_gid >> 32) & 0xFFFFFFFF;
p->login_uid = BPF_CORE_READ(task, loginuid.val);
p->pid = (bpf_get_current_pid_tgid() >> 32) & 0xFFFFFFFF;
p->cgroup_id = bpf_get_current_cgroup_id();
u_int64_t err = bpf_get_current_comm(p->comm, TASK_COMM_LEN);
if (err != 0) {
bpf_printk("Failed to fill task comm");
Expand Down Expand Up @@ -144,11 +80,6 @@ __always_inline static int64_t process_fill(process_t* p) {
}
bpf_probe_read_str(p->exe_path, PATH_MAX, exe_path);

const char* cg = get_memory_cgroup(helper);
if (cg != NULL) {
bpf_probe_read_str(p->memory_cgroup, PATH_MAX, cg);
}

p->in_root_mount_ns = get_mount_ns() == host_mount_ns;

process_fill_lineage(p, helper);
Expand Down
2 changes: 1 addition & 1 deletion fact-ebpf/src/bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ typedef struct process_t {
char args[4096];
unsigned int args_len;
char exe_path[PATH_MAX];
char memory_cgroup[PATH_MAX];
unsigned long long cgroup_id;
unsigned int uid;
unsigned int gid;
unsigned int login_uid;
Expand Down
20 changes: 15 additions & 5 deletions fact/src/bpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tokio::{
task::JoinHandle,
};

use crate::{config::FactConfig, event::Event, host_info, metrics::EventCounter};
use crate::{
cgroup::ContainerIdCache, config::FactConfig, event::Event, host_info, metrics::EventCounter,
};

use fact_ebpf::{event_t, metrics_t, path_prefix_t, LPM_SIZE_MAX};

Expand Down Expand Up @@ -98,6 +100,7 @@ impl Bpf {
mut fd: AsyncFd<RingBuf<MapData>>,
mut running: Receiver<bool>,
event_counter: EventCounter,
cid_cache: ContainerIdCache,
) -> JoinHandle<()> {
info!("Starting BPF worker...");
tokio::spawn(async move {
Expand All @@ -108,7 +111,7 @@ impl Bpf {
let ringbuf = guard.get_inner_mut();
while let Some(event) = ringbuf.next() {
let event: &event_t = unsafe { &*(event.as_ptr() as *const _) };
let event = match Event::try_from(event) {
let event = match Event::new(event, &cid_cache).await {
Ok(event) => Arc::new(event),
Err(e) => {
error!("Failed to parse event: '{e}'");
Expand Down Expand Up @@ -173,15 +176,22 @@ mod bpf_tests {
let (run_tx, run_rx) = watch::channel(true);
// Create a metrics exporter, but don't start it
let exporter = Exporter::new(bpf.get_metrics().unwrap());

Bpf::start_worker(tx, bpf.fd, run_rx, exporter.metrics.bpf_worker.clone());
let cid_cache = ContainerIdCache::new();

Bpf::start_worker(
tx,
bpf.fd,
run_rx,
exporter.metrics.bpf_worker.clone(),
cid_cache,
);

// Create a file
let file =
NamedTempFile::new_in(monitored_path).expect("Failed to create temporary file");
println!("Created {file:?}");

let expected = Event::new(
let expected = Event::from_raw_parts(
file_activity_type_t::FILE_ACTIVITY_CREATION,
host_info::get_hostname(),
file.path().to_path_buf(),
Expand Down
174 changes: 174 additions & 0 deletions fact/src/cgroup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::{
collections::HashMap,
os::unix::fs::DirEntryExt,
path::PathBuf,
sync::Arc,
time::{Duration, SystemTime},
};

use log::warn;
use tokio::{
sync::{watch::Receiver, Mutex},
task::JoinHandle,
time,
};

use crate::host_info::get_cgroup_paths;

#[derive(Debug)]
struct ContainerIdEntry {
container_id: Option<Arc<String>>,
pub last_seen: SystemTime,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this public?

}

type ContainerIdMap = HashMap<u64, ContainerIdEntry>;

#[derive(Debug, Clone, Default)]
pub struct ContainerIdCache(Arc<Mutex<ContainerIdMap>>);

impl ContainerIdCache {
pub fn new() -> Self {
let mut map = HashMap::new();
ContainerIdCache::update_unlocked(&mut map);
ContainerIdCache(Arc::new(Mutex::new(map)))
}

fn update_unlocked(map: &mut ContainerIdMap) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to update_no_lock or update_without_lock. The name implies that this updates something that is unlocked, rather than doing an update without locking.

for root in get_cgroup_paths() {
ContainerIdCache::walk_cgroupfs(&root, map, None);
}
}

async fn update(&mut self) {
let mut map = self.0.lock().await;
ContainerIdCache::update_unlocked(&mut map);
}

async fn prune(&mut self) {
let now = SystemTime::now();
self.0.lock().await.retain(|_, value| {
now.duration_since(value.last_seen).unwrap() < Duration::from_secs(30)
Copy link

@JoukoVirtanen JoukoVirtanen Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe set a variable to 30 seconds and maybe pass that variable to this method to make this more configurable. Also maybe change the name to something like prun_old.

})
}

pub async fn get_container_id(&self, cgroup_id: u64) -> Option<Arc<String>> {
let mut map = self.0.lock().await;
match map.get(&cgroup_id) {
Some(entry) => entry.container_id.clone(),
None => {
// Update the container ID cache and try again
ContainerIdCache::update_unlocked(&mut map);
map.get(&cgroup_id).map(|s| s.container_id.clone())?
}
}
}

pub fn start_worker(mut self, mut running: Receiver<bool>) -> JoinHandle<()> {
tokio::spawn(async move {
let mut update_interval = time::interval(time::Duration::from_secs(30));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe don't hard code 30 here.

loop {
tokio::select! {
_ = update_interval.tick() => {
self.update().await;
self.prune().await;
},
_ = running.changed() => {
if !*running.borrow() {
return;
}
}
}
}
})
}

fn walk_cgroupfs(path: &PathBuf, map: &mut ContainerIdMap, parent_id: Option<Arc<String>>) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to something like walk_cgroupfs_and_add_to_map. The current name doesn't explain what is done while we walk through the cgroupfs.

for entry in std::fs::read_dir(path).unwrap() {
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
warn!("Failed to read {}: {e}", path.display());
continue;
}
};

let p = entry.path();
if !p.is_dir() {
continue;
}

let container_id = match map.get_mut(&entry.ino()) {
Some(e) => {
e.last_seen = SystemTime::now();
e.container_id.clone()
}
None => {
let last_component = p
.file_name()
.map(|f| f.to_str().unwrap_or(""))
.unwrap_or("");
let container_id = match ContainerIdCache::extract_container_id(last_component)
{
Some(cid) => Some(Arc::new(cid)),
None => parent_id.clone(),
};
let last_seen = SystemTime::now();
map.insert(
entry.ino(),
ContainerIdEntry {
container_id: container_id.clone(),
last_seen,
},
);
container_id
}
};
ContainerIdCache::walk_cgroupfs(&p, map, container_id);
}
}

pub fn extract_container_id(cgroup: &str) -> Option<String> {
if cgroup.is_empty() {
return None;
}

let cgroup = cgroup.strip_suffix(".scope").unwrap_or(cgroup);
if cgroup.len() < 64 {
return None;
}

let (prefix, id) = cgroup.split_at(cgroup.len() - 64);

if !prefix.is_empty() && !prefix.ends_with('-') {
return None;
}

if id.chars().all(|c| c.is_ascii_hexdigit()) {
Some(id.split_at(12).0.to_owned())
} else {
None
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn extract_container_id() {
let tests = [
("e73c55f3e7f5b6a9cfc32a89bf13e44d348bcc4fa7b079f804d61fb1532ddbe5", Some("e73c55f3e7f5")),
("cri-containerd-219d7afb8e7450929eaeb06f2d27cbf7183bfa5b55b7275696f3df4154a979af.scope", Some("219d7afb8e74")),
("kubelet-kubepods-burstable-pod469726a5_079d_4d15_a259_1f654b534b44.slice", None),
("libpod-conmon-a2d2a36121868d946af912b931fc5f6b42bf84c700cef67784422b1e2c8585ee.scope", Some("a2d2a3612186")),
("init.scope", None),
("app-flatpak-com.github.IsmaelMartinez.teams_for_linux-384393947.scope", None),
];

for (cgroup, expected) in tests {
let cid = ContainerIdCache::extract_container_id(cgroup);
assert_eq!(cid.as_deref(), expected);
}
}
}
Loading
Loading