1- use aya:: {
2- maps:: {
3- MapData ,
4- perf:: { PerfEventArrayBuffer } ,
5- }
6- } ;
1+ use aya:: { maps:: {
2+ perf:: PerfEventArrayBuffer , Map , MapData , PerfEventArray
3+ } , util:: online_cpus} ;
74
85use bytes:: BytesMut ;
6+ use tokio:: signal;
97use std:: {
108 sync:: {
119 Arc ,
@@ -104,4 +102,75 @@ pub async fn display_time_stamp_events_map(
104102 tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 100 ) ) . await ;
105103 }
106104 info ! ( "Timestamp event listener stopped" ) ;
105+ }
106+
107+ pub async fn event_listener ( bpf_maps : ( Map , Map ) ) -> Result < ( ) , anyhow:: Error > {
108+ info ! ( "Getting CPU count..." ) ;
109+ let cpu_count = online_cpus ( ) . map_err ( |e| anyhow:: anyhow!( "Error {:?}" , e) ) ?. len ( ) ;
110+ info ! ( "CPU count: {}" , cpu_count) ;
111+
112+ info ! ( "Creating perf buffers..." ) ;
113+ let mut net_perf_buffer: Vec < PerfEventArrayBuffer < MapData > > = Vec :: new ( ) ;
114+ let mut net_perf_array: PerfEventArray < MapData > = PerfEventArray :: try_from ( bpf_maps. 0 ) ?;
115+ let mut time_stamp_events_perf_buffer: Vec < PerfEventArrayBuffer < MapData > > = Vec :: new ( ) ;
116+ let mut time_stamp_events_perf_array: PerfEventArray < MapData > =
117+ PerfEventArray :: try_from ( bpf_maps. 1 ) ?;
118+
119+ info ! ( "Opening perf buffers for {} CPUs..." , cpu_count) ;
120+ for cpu_id in online_cpus ( ) . map_err ( |e| anyhow:: anyhow!( "Error {:?}" , e) ) ? {
121+ let buf: PerfEventArrayBuffer < MapData > = net_perf_array. open ( cpu_id, None ) ?;
122+ net_perf_buffer. push ( buf) ;
123+ }
124+ for cpu_id in online_cpus ( ) . map_err ( |e| anyhow:: anyhow!( "Error {:?}" , e) ) ? {
125+ let buf: PerfEventArrayBuffer < MapData > = time_stamp_events_perf_array. open ( cpu_id, None ) ?;
126+ time_stamp_events_perf_buffer. push ( buf) ;
127+ }
128+ info ! ( "Perf buffers created successfully" ) ;
129+
130+ // Create shared running flags
131+ let net_metrics_running = Arc :: new ( AtomicBool :: new ( true ) ) ;
132+ let time_stamp_events_running = Arc :: new ( AtomicBool :: new ( true ) ) ;
133+
134+ // Create proper sized buffers
135+ let net_metrics_buffers = vec ! [ BytesMut :: with_capacity( 1024 ) ; cpu_count] ;
136+ let time_stamp_events_buffers = vec ! [ BytesMut :: with_capacity( 1024 ) ; cpu_count] ;
137+
138+ // Clone for the signal handler
139+ let net_metrics_running_signal = net_metrics_running. clone ( ) ;
140+ let time_stamp_events_running_signal = time_stamp_events_running. clone ( ) ;
141+
142+ info ! ( "Starting event listener tasks..." ) ;
143+ let metrics_map_displayer = tokio:: spawn ( async move {
144+ display_metrics_map ( net_perf_buffer, net_metrics_running, net_metrics_buffers) . await ;
145+ } ) ;
146+
147+ let time_stamp_events_displayer = tokio:: spawn ( async move {
148+ display_time_stamp_events_map ( time_stamp_events_perf_buffer, time_stamp_events_running, time_stamp_events_buffers) . await
149+ } ) ;
150+
151+ info ! ( "Event listeners started, entering main loop..." ) ;
152+
153+ tokio:: select! {
154+ result = metrics_map_displayer => {
155+ if let Err ( e) = result {
156+ error!( "Metrics map displayer task failed: {:?}" , e) ;
157+ }
158+ }
159+
160+ result = time_stamp_events_displayer => {
161+ if let Err ( e) = result {
162+ error!( "Time stamp events displayer task failed: {:?}" , e) ;
163+ }
164+ }
165+
166+ _ = signal:: ctrl_c( ) => {
167+ info!( "Ctrl-C received, shutting down..." ) ;
168+ // Stop the event loops
169+ net_metrics_running_signal. store( false , std:: sync:: atomic:: Ordering :: SeqCst ) ;
170+ time_stamp_events_running_signal. store( false , std:: sync:: atomic:: Ordering :: SeqCst ) ;
171+ }
172+ }
173+
174+ // return success
175+ Ok ( ( ) )
107176}
0 commit comments