1- use std:: { io, sync:: Arc } ;
1+ use std:: { io, path :: PathBuf , sync:: Arc } ;
22
33use anyhow:: { bail, Context } ;
44use aya:: {
5- maps:: { LpmTrie , MapData , PerCpuArray , RingBuf } ,
5+ maps:: { Array , LpmTrie , MapData , PerCpuArray , RingBuf } ,
66 programs:: Lsm ,
77 Btf , Ebpf ,
88} ;
99use libc:: c_char;
1010use log:: { debug, error, info} ;
1111use tokio:: {
1212 io:: unix:: AsyncFd ,
13- sync:: { broadcast, watch:: Receiver } ,
13+ sync:: { broadcast, watch} ,
1414 task:: JoinHandle ,
1515} ;
1616
17- use crate :: { config :: FactConfig , event:: Event , host_info, metrics:: EventCounter } ;
17+ use crate :: { event:: Event , host_info, metrics:: EventCounter } ;
1818
1919use fact_ebpf:: { event_t, metrics_t, path_prefix_t, LPM_SIZE_MAX } ;
2020
21+ const RINGBUFFER_NAME : & str = "rb" ;
22+
2123pub struct Bpf {
22- // The Ebpf object needs to live for as long as we want to keep the
23- // programs loaded
24- #[ allow( dead_code) ]
2524 obj : Ebpf ,
26- pub fd : AsyncFd < RingBuf < MapData > > ,
25+
26+ tx : broadcast:: Sender < Arc < Event > > ,
27+
28+ paths : Vec < path_prefix_t > ,
29+ paths_config : watch:: Receiver < Vec < PathBuf > > ,
2730}
2831
2932impl Bpf {
30- pub fn new ( config : & FactConfig ) -> anyhow:: Result < Self > {
31- const RINGBUFFER_NAME : & str = "rb" ;
32-
33+ pub fn new (
34+ paths_config : watch:: Receiver < Vec < PathBuf > > ,
35+ ringbuf_size : u32 ,
36+ ) -> anyhow:: Result < Self > {
3337 Bpf :: bump_memlock_rlimit ( ) ?;
3438
3539 // Include the BPF object as raw bytes at compile-time and load it
3640 // at runtime.
37- let mut obj = aya:: EbpfLoader :: new ( )
38- . set_global (
39- "filter_by_prefix" ,
40- & ( ( !config. paths ( ) . is_empty ( ) ) as u8 ) ,
41- true ,
42- )
41+ let obj = aya:: EbpfLoader :: new ( )
4342 . set_global ( "host_mount_ns" , & host_info:: get_host_mount_ns ( ) , true )
44- . set_max_entries ( RINGBUFFER_NAME , config . ringbuf_size ( ) * 1024 )
43+ . set_max_entries ( RINGBUFFER_NAME , ringbuf_size * 1024 )
4544 . load ( fact_ebpf:: EBPF_OBJ ) ?;
4645
47- let ringbuf = match obj. take_map ( RINGBUFFER_NAME ) {
48- Some ( r) => r,
49- None => bail ! ( "Ring buffer not found" ) ,
50- } ;
51- let ringbuf = RingBuf :: try_from ( ringbuf) ?;
52- let fd = AsyncFd :: new ( ringbuf) ?;
53-
54- let Some ( path_prefix) = obj. take_map ( "path_prefix" ) else {
55- bail ! ( "path_prefix map not found" ) ;
56- } ;
57- let mut path_prefix: LpmTrie < MapData , [ c_char ; LPM_SIZE_MAX as usize ] , c_char > =
58- LpmTrie :: try_from ( path_prefix) ?;
59- for p in config. paths ( ) {
60- let prefix = path_prefix_t:: try_from ( p) ?;
61- path_prefix. insert ( & prefix. into ( ) , 0 , 0 ) ?;
62- }
63-
64- let btf = Btf :: from_sys_fs ( ) ?;
65- let Some ( trace_file_open) = obj. program_mut ( "trace_file_open" ) else {
66- bail ! ( "trace_file_open program not found" ) ;
46+ let paths = Vec :: new ( ) ;
47+ let ( tx, _) = broadcast:: channel ( 100 ) ;
48+ let mut bpf = Bpf {
49+ obj,
50+ tx,
51+ paths,
52+ paths_config,
6753 } ;
68- let trace_file_open: & mut Lsm = trace_file_open. try_into ( ) ?;
69- trace_file_open. load ( "file_open" , & btf) ?;
70- trace_file_open. attach ( ) ?;
7154
72- let Some ( trace_path_unlink) = obj. program_mut ( "trace_path_unlink" ) else {
73- bail ! ( "trace_path_unlink program not found" ) ;
74- } ;
75- let trace_path_unlink: & mut Lsm = trace_path_unlink. try_into ( ) ?;
76- trace_path_unlink. load ( "path_unlink" , & btf) ?;
77- trace_path_unlink. attach ( ) ?;
55+ bpf. load_paths ( ) ?;
56+ bpf. load_progs ( ) ?;
7857
79- Ok ( Bpf { obj, fd } )
80- }
81-
82- pub fn get_metrics ( & mut self ) -> anyhow:: Result < PerCpuArray < MapData , metrics_t > > {
83- let metrics = match self . obj . take_map ( "metrics" ) {
84- Some ( m) => m,
85- None => bail ! ( "metrics map not found" ) ,
86- } ;
87- Ok ( PerCpuArray :: try_from ( metrics) ?)
58+ Ok ( bpf)
8859 }
8960
9061 fn bump_memlock_rlimit ( ) -> anyhow:: Result < ( ) > {
@@ -104,21 +75,101 @@ impl Bpf {
10475 Ok ( ( ) )
10576 }
10677
78+ pub fn subscribe ( & self ) -> broadcast:: Receiver < Arc < Event > > {
79+ self . tx . subscribe ( )
80+ }
81+
82+ pub fn take_metrics ( & mut self ) -> anyhow:: Result < PerCpuArray < MapData , metrics_t > > {
83+ let metrics = match self . obj . take_map ( "metrics" ) {
84+ Some ( m) => m,
85+ None => bail ! ( "metrics map not found" ) ,
86+ } ;
87+ Ok ( PerCpuArray :: try_from ( metrics) ?)
88+ }
89+
90+ fn take_ringbuffer ( & mut self ) -> anyhow:: Result < RingBuf < MapData > > {
91+ let ringbuf = match self . obj . take_map ( RINGBUFFER_NAME ) {
92+ Some ( r) => r,
93+ None => bail ! ( "Ring buffer not found" ) ,
94+ } ;
95+ Ok ( RingBuf :: try_from ( ringbuf) ?)
96+ }
97+
98+ fn load_paths ( & mut self ) -> anyhow:: Result < ( ) > {
99+ let paths_config = self . paths_config . borrow ( ) ;
100+ let Some ( filter_by_prefix) = self . obj . map_mut ( "filter_by_prefix_map" ) else {
101+ bail ! ( "filter_by_prefix_map map not found" ) ;
102+ } ;
103+ let mut filter_by_prefix: Array < & mut MapData , c_char > = Array :: try_from ( filter_by_prefix) ?;
104+ filter_by_prefix. set ( 0 , !paths_config. is_empty ( ) as c_char , 0 ) ?;
105+
106+ let Some ( path_prefix) = self . obj . map_mut ( "path_prefix" ) else {
107+ bail ! ( "path_prefix map not found" ) ;
108+ } ;
109+ let mut path_prefix: LpmTrie < & mut MapData , [ c_char ; LPM_SIZE_MAX as usize ] , c_char > =
110+ LpmTrie :: try_from ( path_prefix) ?;
111+
112+ // Add the new prefixes
113+ let mut new_paths = Vec :: with_capacity ( paths_config. len ( ) ) ;
114+ for p in paths_config. iter ( ) {
115+ let prefix = path_prefix_t:: try_from ( p) ?;
116+ path_prefix. insert ( & prefix. into ( ) , 0 , 0 ) ?;
117+ new_paths. push ( prefix) ;
118+ }
119+
120+ // Remove old prefixes
121+ for p in self . paths . iter ( ) . filter ( |p| !new_paths. contains ( p) ) {
122+ path_prefix. remove ( & ( * p) . into ( ) ) ?;
123+ }
124+
125+ self . paths = new_paths;
126+
127+ Ok ( ( ) )
128+ }
129+
130+ fn load_lsm_prog ( & mut self , name : & str , hook : & str , btf : & Btf ) -> anyhow:: Result < ( ) > {
131+ let Some ( prog) = self . obj . program_mut ( name) else {
132+ bail ! ( "{name} program not found" ) ;
133+ } ;
134+ let prog: & mut Lsm = prog. try_into ( ) ?;
135+ prog. load ( hook, btf) ?;
136+ Ok ( ( ) )
137+ }
138+
139+ fn load_progs ( & mut self ) -> anyhow:: Result < ( ) > {
140+ let btf = Btf :: from_sys_fs ( ) ?;
141+ self . load_lsm_prog ( "trace_file_open" , "file_open" , & btf) ?;
142+ self . load_lsm_prog ( "trace_path_unlink" , "path_unlink" , & btf)
143+ }
144+
145+ fn attach_progs ( & mut self ) -> anyhow:: Result < ( ) > {
146+ for ( _, prog) in self . obj . programs_mut ( ) {
147+ let prog: & mut Lsm = prog. try_into ( ) ?;
148+ prog. attach ( ) ?;
149+ }
150+ Ok ( ( ) )
151+ }
152+
107153 // Gather events from the ring buffer and print them out.
108- pub fn start_worker (
109- output : broadcast:: Sender < Arc < Event > > ,
110- mut fd : AsyncFd < RingBuf < MapData > > ,
111- mut running : Receiver < bool > ,
154+ pub fn start (
155+ mut self ,
156+ mut running : watch:: Receiver < bool > ,
112157 event_counter : EventCounter ,
113- ) -> JoinHandle < ( ) > {
158+ ) -> JoinHandle < anyhow :: Result < ( ) > > {
114159 info ! ( "Starting BPF worker..." ) ;
160+
115161 tokio:: spawn ( async move {
162+ self . attach_progs ( )
163+ . context ( "Failed to attach ebpf programs" ) ?;
164+
165+ let rb = self . take_ringbuffer ( ) ?;
166+ let mut fd = AsyncFd :: new ( rb) ?;
167+
116168 loop {
117169 tokio:: select! {
118170 guard = fd. readable_mut( ) => {
119171 let mut guard = guard
120- . context( "ringbuffer guard held while runtime is stopping" )
121- . unwrap( ) ;
172+ . context( "ringbuffer guard held while runtime is stopping" ) ?;
122173 let ringbuf = guard. get_inner_mut( ) ;
123174 while let Some ( event) = ringbuf. next( ) {
124175 let event: & event_t = unsafe { & * ( event. as_ptr( ) as * const _) } ;
@@ -133,21 +184,26 @@ impl Bpf {
133184 } ;
134185
135186 event_counter. added( ) ;
136- if output . send( event) . is_err( ) {
187+ if self . tx . send( event) . is_err( ) {
137188 info!( "No BPF consumers left, stopping..." ) ;
138- return ;
189+ break ;
139190 }
140191 }
141192 guard. clear_ready( ) ;
142193 } ,
194+ _ = self . paths_config. changed( ) => {
195+ self . load_paths( ) . context( "Failed to load paths" ) ?;
196+ } ,
143197 _ = running. changed( ) => {
144198 if !* running. borrow( ) {
145199 info!( "Stopping BPF worker..." ) ;
146- return ;
200+ break ;
147201 }
148202 } ,
149203 }
150204 }
205+
206+ Ok ( ( ) )
151207 } )
152208 }
153209}
@@ -161,7 +217,12 @@ mod bpf_tests {
161217 use tempfile:: NamedTempFile ;
162218 use tokio:: { sync:: watch, time:: timeout} ;
163219
164- use crate :: { event:: process:: Process , host_info, metrics:: exporter:: Exporter } ;
220+ use crate :: {
221+ config:: { reloader:: Reloader , FactConfig } ,
222+ event:: process:: Process ,
223+ host_info,
224+ metrics:: exporter:: Exporter ,
225+ } ;
165226
166227 use super :: * ;
167228
@@ -175,20 +236,31 @@ mod bpf_tests {
175236
176237 #[ test]
177238 fn test_basic ( ) {
239+ if let Ok ( value) = std:: env:: var ( "FACT_LOGLEVEL" ) {
240+ let value = value. to_lowercase ( ) ;
241+ if value == "debug" || value == "trace" {
242+ crate :: init_log ( ) . unwrap ( ) ;
243+ }
244+ }
245+
178246 let executor = get_executor ( ) . unwrap ( ) ;
179247 let monitored_path = env ! ( "CARGO_MANIFEST_DIR" ) ;
180248 let monitored_path = PathBuf :: from ( monitored_path) ;
181249 let paths = vec ! [ monitored_path. clone( ) ] ;
182250 let mut config = FactConfig :: default ( ) ;
183251 config. set_paths ( paths) ;
252+ let reloader = Reloader :: from ( config) ;
184253 executor. block_on ( async {
185- let mut bpf = Bpf :: new ( & config) . expect ( "Failed to load BPF code" ) ;
186- let ( tx, mut rx) = broadcast:: channel ( 100 ) ;
254+ let mut bpf = Bpf :: new ( reloader. paths ( ) , reloader. config ( ) . ringbuf_size ( ) )
255+ . expect ( "Failed to load BPF code" ) ;
256+ let mut rx = bpf. subscribe ( ) ;
187257 let ( run_tx, run_rx) = watch:: channel ( true ) ;
188258 // Create a metrics exporter, but don't start it
189- let exporter = Exporter :: new ( bpf. get_metrics ( ) . unwrap ( ) ) ;
259+ let exporter = Exporter :: new ( bpf. take_metrics ( ) . unwrap ( ) ) ;
190260
191- Bpf :: start_worker ( tx, bpf. fd , run_rx, exporter. metrics . bpf_worker . clone ( ) ) ;
261+ let handle = bpf. start ( run_rx, exporter. metrics . bpf_worker . clone ( ) ) ;
262+
263+ tokio:: time:: sleep ( Duration :: from_millis ( 500 ) ) . await ;
192264
193265 // Create a file
194266 let file =
@@ -205,16 +277,19 @@ mod bpf_tests {
205277 . unwrap ( ) ;
206278
207279 println ! ( "Expected: {expected:?}" ) ;
208- timeout ( Duration :: from_secs ( 1 ) , async move {
280+ let wait = timeout ( Duration :: from_secs ( 1 ) , async move {
209281 while let Ok ( event) = rx. recv ( ) . await {
210282 println ! ( "{event:?}" ) ;
211283 if * event == expected {
212284 break ;
213285 }
214286 }
215- } )
216- . await
217- . unwrap ( ) ;
287+ } ) ;
288+
289+ tokio:: select! {
290+ res = wait => res. unwrap( ) ,
291+ res = handle => res. unwrap( ) . unwrap( ) ,
292+ }
218293
219294 run_tx. send ( false ) . unwrap ( ) ;
220295 } ) ;
0 commit comments