Skip to content

Commit 4dba0f2

Browse files
[#150]: Restore init_tc_classifier function (was previosly commented for development reasons). Updated identity.yaml manifest
1 parent c3618c8 commit 4dba0f2

2 files changed

Lines changed: 88 additions & 63 deletions

File tree

core/src/components/identity/src/main.rs

Lines changed: 87 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,39 @@
1212

1313
mod enums;
1414
mod helpers;
15-
mod structs;
1615
mod map_handlers;
16+
mod structs;
1717

1818
use aya::{
19-
Ebpf, maps::{ Map, MapData, perf::{ PerfEventArray, PerfEventArrayBuffer } }, programs::{ KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId }, util::online_cpus
19+
Ebpf,
20+
maps::{
21+
Map, MapData,
22+
perf::{PerfEventArray, PerfEventArrayBuffer},
23+
},
24+
programs::{KProbe, SchedClassifier, TcAttachType, tc::SchedClassifierLinkId},
25+
util::online_cpus,
2026
};
2127

2228
use crate::helpers::{
23-
display_events,
24-
display_veth_events,
25-
get_veth_channels,
26-
display_tcp_registry_events,
27-
scan_cgroup_cronjob
29+
display_events, display_tcp_registry_events, display_veth_events, get_veth_channels,
30+
scan_cgroup_cronjob,
2831
};
29-
use crate::map_handlers::{ init_bpf_maps, map_pinner, populate_blocklist };
32+
use crate::map_handlers::{init_bpf_maps, map_pinner, populate_blocklist};
3033

3134
use bytes::BytesMut;
32-
use std::{ convert::TryInto, path::Path, sync::{ Arc, Mutex, atomic::{ AtomicBool, Ordering } } };
35+
use std::{
36+
convert::TryInto,
37+
path::Path,
38+
sync::{
39+
Arc, Mutex,
40+
atomic::{AtomicBool, Ordering},
41+
},
42+
};
3343

34-
use anyhow::{ Context, Ok };
35-
use tokio::{ fs, signal };
36-
use tracing::{ error, info };
37-
use cortexbrain_common::{ constants, logger };
44+
use anyhow::{Context, Ok};
45+
use cortexbrain_common::{constants, logger};
46+
use tokio::{fs, signal};
47+
use tracing::{error, info};
3848

3949
use std::collections::HashMap;
4050

@@ -50,15 +60,15 @@ async fn main() -> Result<(), anyhow::Error> {
5060
let link_ids = Arc::new(Mutex::new(HashMap::<String, SchedClassifierLinkId>::new()));
5161

5262
//init conntracker data path
53-
let bpf_path = std::env
54-
::var(constants::BPF_PATH)
55-
.context("BPF_PATH environment variable required")?;
56-
let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?;
63+
let bpf_path =
64+
std::env::var(constants::BPF_PATH).context("BPF_PATH environment variable required")?;
65+
let data = fs::read(Path::new(&bpf_path))
66+
.await
67+
.context("failed to load file from path")?;
5768

5869
//init bpf data
5970
let bpf = Arc::new(Mutex::new(Ebpf::load(&data)?));
60-
let bpf_map_save_path = std::env
61-
::var(constants::PIN_MAP_PATH)
71+
let bpf_map_save_path = std::env::var(constants::PIN_MAP_PATH)
6272
.context("PIN_MAP_PATH environment variable required")?;
6373

6474
match init_bpf_maps(bpf.clone()) {
@@ -82,20 +92,20 @@ async fn main() -> Result<(), anyhow::Error> {
8292
populate_blocklist(&mut bpf_maps.2).await;
8393
}
8494

85-
//{
86-
// init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()).await.context(
87-
// "An error occured during the execution of attach_bpf_program function"
88-
// )?;
89-
//}
95+
{
96+
init_tc_classifier(bpf.clone(), interfaces, link_ids.clone()).await.context(
97+
"An error occured during the execution of attach_bpf_program function"
98+
)?;
99+
}
90100
{
91101
init_tcp_registry(bpf.clone()).await.context(
92-
"An error occured during the execution of init_tcp_registry function"
102+
"An error occured during the execution of init_tcp_registry function",
93103
)?;
94104
}
95105

96-
event_listener(bpf_maps, link_ids.clone(), bpf.clone()).await.context(
97-
"Error initializing event_listener"
98-
)?;
106+
event_listener(bpf_maps, link_ids.clone(), bpf.clone())
107+
.await
108+
.context("Error initializing event_listener")?;
99109
}
100110
Err(e) => {
101111
error!("Error while pinning bpf_maps: {}", e);
@@ -115,7 +125,7 @@ async fn main() -> Result<(), anyhow::Error> {
115125
async fn init_tc_classifier(
116126
bpf: Arc<Mutex<Ebpf>>,
117127
ifaces: Vec<String>,
118-
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>
128+
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
119129
) -> Result<(), anyhow::Error> {
120130
//this funtion initialize the tc classifier program
121131
info!("Loading programs");
@@ -128,16 +138,24 @@ async fn init_tc_classifier(
128138
.try_into()
129139
.context("Failed to init SchedClassifier program")?;
130140

131-
program.load().context("Failed to load identity_classifier program")?;
141+
program
142+
.load()
143+
.context("Failed to load identity_classifier program")?;
132144

133145
for interface in ifaces {
134146
match program.attach(&interface, TcAttachType::Ingress) {
135147
std::result::Result::Ok(link_id) => {
136-
info!("Program 'identity_classifier' attached to interface {}", interface);
148+
info!(
149+
"Program 'identity_classifier' attached to interface {}",
150+
interface
151+
);
137152
let mut map = link_ids.lock().unwrap();
138153
map.insert(interface.clone(), link_id);
139154
}
140-
Err(e) => error!("Error attaching program to interface {}: {:?}", interface, e),
155+
Err(e) => error!(
156+
"Error attaching program to interface {}: {:?}",
157+
interface, e
158+
),
141159
}
142160
}
143161

@@ -166,7 +184,9 @@ async fn init_veth_tracer(bpf: Arc<Mutex<Ebpf>>) -> Result<(), anyhow::Error> {
166184
.program_mut("veth_deletion_trace")
167185
.ok_or_else(|| anyhow::anyhow!("program 'veth_deletion_trace' not found"))?
168186
.try_into()?;
169-
veth_deletion_tracer.load().context("Failed to load deletetion_tracer program")?;
187+
veth_deletion_tracer
188+
.load()
189+
.context("Failed to load deletetion_tracer program")?;
170190

171191
match veth_deletion_tracer.attach("unregister_netdevice_queue", 0) {
172192
std::result::Result::Ok(_) => info!("veth_deletion_trace program attached successfully"),
@@ -185,25 +205,30 @@ async fn init_tcp_registry(bpf: Arc<Mutex<Ebpf>>) -> Result<(), anyhow::Error> {
185205
.ok_or_else(|| anyhow::anyhow!("program 'tcp_message_tracer' not found"))?
186206
.try_into()?;
187207

188-
tcp_analyzer.load().context("Failed to load tcp_message_tracer")?;
208+
tcp_analyzer
209+
.load()
210+
.context("Failed to load tcp_message_tracer")?;
189211

190212
info!("initializing tcp tracing functions");
191213

192214
match tcp_analyzer.attach("tcp_v4_rcv", 0) {
193-
std::result::Result::Ok(_) =>
194-
info!("tcp_message_tracer attached successfully to the tcp_v4_rcv function "),
195-
Err(e) =>
196-
error!("Error attaching tcp_message_tracer to the tcp_v4_rcv function. Error: {:?}", e),
215+
std::result::Result::Ok(_) => {
216+
info!("tcp_message_tracer attached successfully to the tcp_v4_rcv function ")
217+
}
218+
Err(e) => error!(
219+
"Error attaching tcp_message_tracer to the tcp_v4_rcv function. Error: {:?}",
220+
e
221+
),
197222
}
198223

199224
match tcp_analyzer.attach("tcp_v4_connect", 0) {
200-
std::result::Result::Ok(_) =>
201-
info!("tcp_message_tracer attached successfully to the tcp_v4_connect function "),
202-
Err(e) =>
203-
error!(
204-
"Error attaching tcp_message_tracer to the tcp_v4_connect function. Error: {:?}",
205-
e
206-
),
225+
std::result::Result::Ok(_) => {
226+
info!("tcp_message_tracer attached successfully to the tcp_v4_connect function ")
227+
}
228+
Err(e) => error!(
229+
"Error attaching tcp_message_tracer to the tcp_v4_connect function. Error: {:?}",
230+
e
231+
),
207232
}
208233

209234
Ok(())
@@ -212,7 +237,7 @@ async fn init_tcp_registry(bpf: Arc<Mutex<Ebpf>>) -> Result<(), anyhow::Error> {
212237
async fn event_listener(
213238
bpf_maps: (Map, Map, Map, Map),
214239
link_ids: Arc<Mutex<HashMap<String, SchedClassifierLinkId>>>,
215-
bpf: Arc<Mutex<Ebpf>>
240+
bpf: Arc<Mutex<Ebpf>>,
216241
) -> Result<(), anyhow::Error> {
217242
// this function init the event listener. Listens for veth events (creation/deletion) and network events (pod to pod communications)
218243
/* Doc:
@@ -252,10 +277,8 @@ async fn event_listener(
252277
perf_net_events_buffer.push(events_buf);
253278
}
254279
for cpu_id in online_cpus().map_err(|e| anyhow::anyhow!("Error {:?}", e))? {
255-
let tcp_registry_buf: PerfEventArrayBuffer<MapData> = tcp_registry_array.open(
256-
cpu_id,
257-
None
258-
)?;
280+
let tcp_registry_buf: PerfEventArrayBuffer<MapData> =
281+
tcp_registry_array.open(cpu_id, None)?;
259282
tcp_registry_buffer.push(tcp_registry_buf);
260283
}
261284

@@ -284,13 +307,15 @@ async fn event_listener(
284307
perf_veth_buffer,
285308
veth_running,
286309
veth_buffers,
287-
veth_link_ids
288-
).await;
310+
veth_link_ids,
311+
)
312+
.await;
289313
});
290314

291-
//let net_events_displayer = tokio::spawn(async move {
292-
// display_events(perf_net_events_buffer, net_events_running, events_buffers).await;
293-
//});
315+
// IDEA: Maybe we don't need to display all this events
316+
let net_events_displayer = tokio::spawn(async move {
317+
display_events(perf_net_events_buffer, net_events_running, events_buffers).await;
318+
});
294319

295320
let tcp_registry_events_displayer: tokio::task::JoinHandle<()> = tokio::spawn(async move {
296321
display_tcp_registry_events(tcp_registry_buffer, tcp_registry_running, tcp_buffers).await;
@@ -314,13 +339,13 @@ async fn event_listener(
314339
}
315340
}
316341

317-
//result = net_events_displayer=>{
318-
// match result{
319-
// Err(e)=>error!("net_event_displayer panicked {:?}",e),
320-
// std::result::Result::Ok(_) => info!("Found new net_event"),
321-
// }
322-
//}
323-
342+
result = net_events_displayer=>{
343+
match result{
344+
Err(e)=>error!("net_event_displayer panicked {:?}",e),
345+
std::result::Result::Ok(_) => info!("Found new net_event"),
346+
}
347+
}
348+
324349
result = tcp_registry_events_displayer => {
325350
match result{
326351
Err(e)=>error!("tcp_registry_events_displayer panicked {:?}",e),

core/src/testing/identity.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ spec:
5353
- SYS_PTRACE
5454
containers:
5555
- name: identity
56-
image: lorenzotettamanti/cortexflow-identity:0.1.1-cgroup_scannerv16
56+
image: lorenzotettamanti/cortexflow-identity:0.1.1-cgroup_scanner_exp
5757
command: ["/bin/bash", "-c"]
5858
args:
5959
- |

0 commit comments

Comments
 (0)