Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111,665 changes: 111,665 additions & 0 deletions core/src/components/conntracker/src/bindings.rs

Large diffs are not rendered by default.

17 changes: 11 additions & 6 deletions core/src/components/identity/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,21 @@ async fn main() -> Result<(), anyhow::Error> {
std::result::Result::Ok(_) => {
info!("maps pinned successfully");
//load veth_trace program ref veth_trace.rs
init_veth_tracer(bpf.clone()).await?;
{
init_veth_tracer(bpf.clone()).await?;
}

let interfaces = get_veth_channels();

info!("Found interfaces: {:?}", interfaces);
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
.await
.context(
"An error occured during the execution of attach_bpf_program function",
)?;

{
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone())
.await
.context(
"An error occured during the execution of attach_bpf_program function",
)?;
}

event_listener(bpf_maps, link_ids.clone(), bpf.clone())
.await
Expand Down
1 change: 1 addition & 0 deletions core/src/components/metrics/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ COPY metrics_tracer /usr/src/cortexbrain-metrics/metrics_tracer

# Set environment variable
ENV BPF_PATH="/usr/src/cortexbrain-metrics/metrics_tracer"
ENV PIN_MAP_PATH="/sys/fs/bpf/trace_maps"

# Default command
CMD ["cortexflow-metrics"]
136 changes: 129 additions & 7 deletions core/src/components/metrics/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
use aya::{
maps::{
MapData,
perf::{PerfEventArrayBuffer},
}
};
use aya::{maps::{
perf::PerfEventArrayBuffer, Map, MapData, PerfEventArray
}, util::online_cpus};

use bytes::BytesMut;
use tokio::signal;
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};

use tracing::{error, info};

use crate::structs::NetworkMetrics;
use crate::structs::TimeStampMetrics;

pub async fn display_metrics_map(
mut perf_buffers: Vec<PerfEventArrayBuffer<MapData>>,
running: AtomicBool,
running: Arc<AtomicBool>, // Changed to Arc<AtomicBool>
mut buffers: Vec<BytesMut>,
) {
info!("Starting metrics event listener...");
while running.load(Ordering::SeqCst) {
for buf in perf_buffers.iter_mut() {
match buf.read_events(&mut buffers) {
std::result::Result::Ok(events) => {
if events.read > 0 {
info!("Read {} metric events", events.read);
}
for i in 0..events.read {
let data = &buffers[i];
if data.len() >= std::mem::size_of::<NetworkMetrics>() {
Expand All @@ -41,6 +45,8 @@ pub async fn display_metrics_map(
"sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}",
sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size
);
} else {
info!("Received data too small: {} bytes, expected: {}", data.len(), std::mem::size_of::<NetworkMetrics>());
}
}
}
Expand All @@ -51,4 +57,120 @@ pub async fn display_metrics_map(
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
info!("Metrics event listener stopped");
}

pub async fn display_time_stamp_events_map(
mut perf_buffers: Vec<PerfEventArrayBuffer<MapData>>,
running: Arc<AtomicBool>, // Changed to Arc<AtomicBool>
mut buffers: Vec<BytesMut>,
) {
info!("Starting timestamp event listener...");
while running.load(Ordering::SeqCst) {
for buf in perf_buffers.iter_mut() {
match buf.read_events(&mut buffers) {
std::result::Result::Ok(events) => {
if events.read > 0 {
info!("Read {} timestamp events", events.read);
}
for i in 0..events.read {
let data = &buffers[i];
if data.len() >= std::mem::size_of::<TimeStampMetrics>() {
let time_stamp_event: TimeStampMetrics =
unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) };
let delta_us = time_stamp_event.delta_us;
let ts_us = time_stamp_event.ts_us;
let tgid = time_stamp_event.tgid;
let comm = String::from_utf8_lossy(&time_stamp_event.comm);
let lport = time_stamp_event.lport;
let dport_be = time_stamp_event.dport_be;
let af = time_stamp_event.af;
info!(
"TimeStampEvent - delta_us: {}, ts_us: {}, tgid: {}, comm: {}, lport: {}, dport_be: {}, af: {}",
delta_us, ts_us, tgid, comm, lport, dport_be, af
);
} else {
info!("Received timestamp data too small: {} bytes", data.len());
}
}
}
Err(e) => {
error!("Error reading timestamp events: {:?}", e);
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
info!("Timestamp event listener stopped");
}

pub async fn event_listener(bpf_maps: (Map, Map)) -> Result<(), anyhow::Error> {
info!("Getting CPU count...");
let cpu_count = online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))?.len();
info!("CPU count: {}", cpu_count);

info!("Creating perf buffers...");
let mut net_perf_buffer: Vec<PerfEventArrayBuffer<MapData>> = Vec::new();
let mut net_perf_array: PerfEventArray<MapData> = PerfEventArray::try_from(bpf_maps.0)?;
let mut time_stamp_events_perf_buffer: Vec<PerfEventArrayBuffer<MapData>> = Vec::new();
let mut time_stamp_events_perf_array: PerfEventArray<MapData> =
PerfEventArray::try_from(bpf_maps.1)?;

info!("Opening perf buffers for {} CPUs...", cpu_count);
for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
let buf: PerfEventArrayBuffer<MapData> = net_perf_array.open(cpu_id, None)?;
net_perf_buffer.push(buf);
}
for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
let buf: PerfEventArrayBuffer<MapData> = time_stamp_events_perf_array.open(cpu_id, None)?;
time_stamp_events_perf_buffer.push(buf);
}
info!("Perf buffers created successfully");

// Create shared running flags
let net_metrics_running = Arc::new(AtomicBool::new(true));
let time_stamp_events_running = Arc::new(AtomicBool::new(true));

// Create proper sized buffers
let net_metrics_buffers = vec![BytesMut::with_capacity(1024); cpu_count];
let time_stamp_events_buffers = vec![BytesMut::with_capacity(1024); cpu_count];

// Clone for the signal handler
let net_metrics_running_signal = net_metrics_running.clone();
let time_stamp_events_running_signal = time_stamp_events_running.clone();

info!("Starting event listener tasks...");
let metrics_map_displayer = tokio::spawn(async move {
display_metrics_map(net_perf_buffer, net_metrics_running, net_metrics_buffers).await;
});

let time_stamp_events_displayer = tokio::spawn(async move {
display_time_stamp_events_map(time_stamp_events_perf_buffer, time_stamp_events_running, time_stamp_events_buffers).await
});

info!("Event listeners started, entering main loop...");

tokio::select! {
result = metrics_map_displayer => {
if let Err(e) = result {
error!("Metrics map displayer task failed: {:?}", e);
}
}

result = time_stamp_events_displayer => {
if let Err(e) = result {
error!("Time stamp events displayer task failed: {:?}", e);
}
}

_ = signal::ctrl_c() => {
info!("Ctrl-C received, shutting down...");
// Stop the event loops
net_metrics_running_signal.store(false, std::sync::atomic::Ordering::SeqCst);
time_stamp_events_running_signal.store(false, std::sync::atomic::Ordering::SeqCst);
}
}

// return success
Ok(())
}
101 changes: 50 additions & 51 deletions core/src/components/metrics/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
use aya::{
Ebpf,
maps::{
MapData,
perf::{PerfEventArray, PerfEventArrayBuffer},
},
programs::{KProbe},
util::online_cpus,
Ebpf
};

use bytes::BytesMut;
use std::{
convert::TryInto,
env, fs,
path::Path,
sync::{
atomic::{AtomicBool},
Arc, Mutex,
},
};

use anyhow::{Context, Ok};
use tokio::{signal};
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};

const BPF_PATH: &str = "BPF_PATH"; //BPF env path
const PIN_MAP_PATH: &str = "PIN_MAP_PATH";

mod helpers;
use crate::helpers::display_metrics_map;
use crate::{helpers::event_listener, maps_handlers::map_pinner, program_handlers::load_and_attach_tcp_programs};

mod maps_handlers;
use crate::maps_handlers::init_ebpf_maps;

mod program_handlers;
use crate::program_handlers::load_program;

mod structs;

Expand All @@ -49,49 +47,50 @@ async fn main() -> Result<(), anyhow::Error> {

let bpf_path = env::var(BPF_PATH).context("BPF_PATH environment variable required")?;
let data = fs::read(Path::new(&bpf_path)).context("Failed to load file from path")?;
let mut bpf = Ebpf::load(&data)?;
//init bpf logger
let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?));
let tcp_bpf = bpf.clone();
let tcp_rev_bpf = bpf.clone();

info!("Running Ebpf logger");
info!("loading programs");
let net_metrics_map = bpf
.take_map("net_metrics")
.ok_or_else(|| anyhow::anyhow!("net_metrics map not found"))?;

let program: &mut KProbe = bpf
.program_mut("metrics_tracer")
.ok_or_else(|| anyhow::anyhow!("program 'metrics_tracer' not found"))?
.try_into()
.context("Failed to init Kprobe program")?;

program
.load()
.context("Failed to load metrics_tracer program")?;

match program.attach("tcp_identify_packet_loss", 0) {
std::result::Result::Ok(_) => {
info!("program attached successfully to the tcp_identify_packet_loss kprobe ")
}
Err(e) => error!(
"An error occured while attaching the program to the tcp_identify_packet_loss kprobe. {:?} ",
e
),
}
let mut net_perf_buffer: Vec<PerfEventArrayBuffer<MapData>> = Vec::new();
let mut net_perf_array: PerfEventArray<MapData> = PerfEventArray::try_from(net_metrics_map)?;
let bpf_map_save_path =
std::env::var(PIN_MAP_PATH).context("PIN_MAP_PATH environment variable required")?;

for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
let buf: PerfEventArrayBuffer<MapData> = net_perf_array.open(cpu_id, None)?;
net_perf_buffer.push(buf);
}
let running = AtomicBool::new(true);
match init_ebpf_maps(bpf.clone()) {
std::result::Result::Ok(maps) => {
info!("BPF maps loaded successfully");
match map_pinner(&maps, &bpf_map_save_path.clone().into()).await {
std::result::Result::Ok(_) => {
info!("BPF maps pinned successfully to {}", bpf_map_save_path);

{
load_program(bpf.clone(), "metrics_tracer", "tcp_identify_packet_loss")
.context("An error occured during the execution of load_program function")?;
}

{
load_and_attach_tcp_programs(tcp_bpf.clone())
.context("An error occured during the execution of load_and_attach_tcp_programs function")?;
}

let buffers = vec![BytesMut::with_capacity(1024); 10];
{
load_program(tcp_rev_bpf.clone(), "tcp_rcv_state_process", "tcp_rcv_state_process")
.context("An error occured during the execution of load_program function")?;
}


tokio::spawn(async move{
display_metrics_map(net_perf_buffer, running, buffers).await;
});
event_listener(maps).await?;
}
Err(e) => {
error!("Error pinning BPF maps: {:?}", e);
return Err(e);
}
}
}
Err(e) => {
error!("Error initializing BPF maps: {:?}", e);
return Err(e);
}
}

signal::ctrl_c().await?;
Ok(())
}
}
48 changes: 48 additions & 0 deletions core/src/components/metrics/src/maps_handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::{path::PathBuf, sync::{Arc, Mutex}};
use tokio::fs;
use anyhow::Error;
use aya::{maps::Map, Ebpf};
use tracing::info;



pub fn init_ebpf_maps(bpf: Arc<Mutex<Ebpf>>) -> Result<(Map, Map), anyhow::Error> {
// this function init the bpfs maps used in the main program
/*
index 0: net_metrics
index 1: time_stamp_events
*/
let mut bpf_new = bpf.lock().unwrap();

let net_metrics_map = bpf_new
.take_map("net_metrics")
.ok_or_else(|| anyhow::anyhow!("net_metrics map not found"))?;

let time_stamps_events_map = bpf_new
.take_map("time_stamp_events")
.ok_or_else(|| anyhow::anyhow!("time_stamp_events map not found"))?;

Ok((net_metrics_map, time_stamps_events_map))
}

pub async fn map_pinner(maps: &(Map, Map), path: &PathBuf) -> Result<(), Error> {
// check if the map exists
if !path.exists() {
info!("Pin path {:?} does not exist. Creating it...", path);
fs::create_dir_all(&path).await?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)).await?;
}
}

let map1_path = path.join("net_metrics");
let map2_path = path.join("time_stamp_events");

// maps pinning
maps.0.pin(&map1_path)?;
maps.1.pin(&map2_path)?;

Ok(())
}
Loading