Skip to content

Commit 54fae00

Browse files
authored
Adding Kafka streaming example script
This script is implementing flow data aggregation and streaming of the results to Kafka stream in CBOR format. It can be easily changed to use a different format or sent different data. Consider this one as an example of what is possible and modify it to your needs.
1 parent 52f6a70 commit 54fae00

File tree

3 files changed

+245
-0
lines changed

3 files changed

+245
-0
lines changed

kafka-streaming/README.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Flow data to Kafka streaming
2+
3+
This script is implementing flow data aggregation and streaming of the
4+
results to Kafka stream in CBOR format. It can be easily changed to use
5+
a different format or sent different data. Consider this one as an
6+
example of what is possible and modify it to your needs.
7+
8+
It has been tested with Flowmon system version 12.3.
9+
10+
## Prerequisites
11+
12+
To run it you would need to create a Python virtual environment on your
13+
Flowmon appliance and add necessary libraries which aren’t present on
14+
Flowmon system. This can be achieved by running the following commands:
15+
16+
python3 -m venv kafka
17+
18+
The kafka is a name of the virtual environment. If you use a different
19+
name, you will need to change that in the script as well. The following
20+
is changing directory, and you need to use name of virtual environment
21+
used above. The script then needs to be placed in this folder.
22+
23+
cd kafka
24+
25+
source bin/activate
26+
27+
pip3 install kafka-python
28+
29+
pip3 install cbor
30+
31+
## Using the script
32+
33+
The script is made to run every five minutes and you can add it to
34+
Flowmon user crontab by editing it with command “crontab -e”. It’s
35+
keeping its last timestamp in a file called last and if this one doesn’t
36+
exist it is created.
37+
38+
When you need to test the script multiple times you would need to delete
39+
this file as it would round the current time to previous 5-minute
40+
interval to run analysis by nfdump console command.
41+
42+
The command to get the aggregation is present in function get\_data.
43+
44+
command = f"/usr/local/bin/nfdump -M
45+
/data/nfsen/profiles-data/live/'127-0-0-1\_p3000:127-0-0-1\_p2055' -r
46+
{timestamp} -A 'dstctry' -o 'fmt:%ts,%dcc,%td,%pkt,%byt,%pps,%bps,%fl'
47+
-6 --no-scale-number"
48+
49+
The result in the SSH command line when tunning this command could look
50+
like following.
51+
52+
Date first seen Dst Ctry Duration Packets Bytes pps bps Flows
53+
54+
2023-11-30 11:29:22.585, 203, 302.210, 760, 58348, 2, 1544, 196
55+
56+
2023-11-30 11:29:39.261, 826, 271.966, 55, 4541, 0, 133, 15
57+
58+
2023-11-30 11:30:08.502, 372, 227.322, 189, 81984, 0, 2885, 13
59+
60+
2023-11-30 11:30:54.374, 250, 150.388, 351, 195125, 2, 10379, 22
61+
62+
2023-11-30 11:27:04.546, 840, 468.700, 4592, 1172714, 9, 20016, 1486
63+
64+
2023-11-30 11:30:06.511, 276, 200.593, 84, 10942, 0, 436, 5
65+
66+
2023-11-30 11:32:00.974, 100, 0.000, 1, 76, 0, 0, 1
67+
68+
2023-11-30 11:25:03.508, 0, 594.975, 829590,893719438,
69+
1394,12016900,14558
70+
71+
2023-11-30 11:29:44.087, 528, 297.434, 676, 204732, 2, 5506, 42
72+
73+
Summary: total flows: 16338, total bytes: 895447900, total packets:
74+
836298, avg bps: 12040141, avg pps: 1405, avg bpp: 1070
75+
76+
Time window: 2023-11-30 11:25:03 - 2023-11-30 11:35:00
77+
78+
Total flows processed: 16338, Blocks skipped: 0, Bytes read: 5883516
79+
80+
Sys: 0.028s flows/second: 569427.0 Wall: 0.010s flows/second: 1603966.2
81+
82+
The easiest way to get the command for aggregation is to run the query
83+
in the Monitoring Center Analysis where you get the results you are
84+
after. Do not forget to select all fields you want to use for
85+
aggregation, filter the data (if needed), select proper output format
86+
and limit on the number of results which are interesting for you. Also
87+
select the right profile and channels where you want to get the data
88+
from.
89+
90+
![A screenshot of a computer Description automatically
91+
generated](media/image1.png)
92+
93+
Once you click on the black terminal window icon it will give the
94+
statistics command. This one would look like the above example so you
95+
can replace this command between quotas. Just change -R to -r
96+
{timestamp}” as it’s in the example so this can change the timestamp of
97+
analyzed data with each run.
98+
99+
When you modify the command, you would need to modify the function
100+
process\_records as the record would have a different format based on
101+
the output you have selected.
102+
103+
The script supports three arguments.
104+
105+
\-i HOST, --host HOST IP address/hostname of the bootstrap server
106+
107+
\-p PORT, --port PORT Port of the running boostrap server
108+
109+
\-t TOPIC, --topic TOPIC Kafka topic to stream
110+
111+
There is a log file located in the script folder (by default
112+
kafka/kafka-stream.log) which can help you with troubleshooting. It does
113+
require connection from external IP to bootstrap Kafka server configured
114+
so it can connect and send data for the specified topic.

kafka-streaming/media/analysis.png

43.8 KB
Loading

kafka-streaming/stream-flow.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#!/home/flowmon/kafka/bin/python3
2+
# -*- coding: utf-8 -*-
3+
"""
4+
This script is to aggregate flow data for streaming to kafka
5+
6+
=========================================================================================
7+
"""
8+
9+
import argparse
10+
from decimal import Rounded
11+
import logging
12+
import subprocess
13+
import shlex
14+
from kafka import KafkaProducer
15+
from kafka.errors import KafkaError
16+
import cbor
17+
import datetime
18+
19+
LOGGING_FORMAT = '%(asctime)s - %(module)s - %(levelname)s : %(message)s'
20+
logging.basicConfig(filename='/home/flowmon/kafka/kafka-stream.log', format=LOGGING_FORMAT, level=logging.DEBUG)
21+
22+
def parse_arguments():
23+
parser = argparse.ArgumentParser(prog='stream-flow-asn.py')
24+
parser.add_argument("-i", "--host", action='store', type=str, help="IP address/hostname of the bootstrap server", required=True)
25+
parser.add_argument("-p", "--port", action='store', type=int, help="Port running boostrap server", required=True, default=9092)
26+
parser.add_argument("-t", "--topic", action='store', type=str, help="Kafka topic to stream", default='network-metadata')
27+
arguments = vars(parser.parse_args())
28+
return arguments
29+
30+
def roundDownDateTime(dt):
31+
delta_min = dt.minute % 5
32+
return datetime.datetime(dt.year, dt.month, dt.day,
33+
dt.hour, dt.minute - delta_min)
34+
35+
def run_command(command_line):
36+
arguments = shlex.split(command_line)
37+
p = subprocess.Popen(arguments, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
38+
std_data = p.communicate()
39+
output = (p.returncode, std_data[0].decode("utf8"), std_data[1].decode("utf8"))
40+
return output
41+
42+
def process_records(data):
43+
records = []
44+
lines = data.splitlines()
45+
# Get rid of the headers
46+
lines.pop(0)
47+
# Get a statistics and send them to log
48+
sysstat = lines.pop()
49+
logging.debug(f"nfdump processing stats: {sysstat}")
50+
# number of processed flows
51+
totals = lines.pop()
52+
logging.debug(totals)
53+
# time window information we will just drop
54+
lines.pop()
55+
# same for summary stats
56+
lines.pop()
57+
# so now we have the lines with data only
58+
for line in lines:
59+
rows = line.split(',')
60+
record = {'first_seen': rows[0],
61+
'dst_ctr': rows[1].strip(),
62+
'duration': rows[2].strip(),
63+
'packets': rows[3].strip(),
64+
'bytes': rows[4].strip(),
65+
'pps': rows[5].strip(),
66+
'bps': rows[6].strip(),
67+
'flows': rows[7].strip()}
68+
records.append(record)
69+
return records
70+
71+
def get_data(timestamp):
72+
# Get the data from collector
73+
command = f"/usr/local/bin/nfdump -M /data/nfsen/profiles-data/live/'127-0-0-1_p3000:127-0-0-1_p2055' -r {timestamp} -A 'dstctry' -o 'fmt:%ts,%dcc,%td,%pkt,%byt,%pps,%bps,%fl' -6 --no-scale-number"
74+
logging.debug(command)
75+
output = run_command(command)
76+
if output[0] == 0:
77+
logging.debug(f"Commmand processed succesfully.")
78+
return output[1]
79+
else:
80+
logging.error(output)
81+
82+
def on_success(metadata):
83+
logging.info(f"Message produced to topic '{metadata.topic}' at offset {metadata.offset}")
84+
85+
def on_error(e):
86+
logging.error(f"Error sending message: {e}")
87+
88+
def get_timestamp():
89+
try:
90+
file = open('/home/flowmon/kafka/last', 'r')
91+
datestamp = file.read()
92+
file.close()
93+
dateobj = datetime.datetime.strptime(datestamp,"%Y%m%d%H%M")
94+
dateob_5 = dateobj + datetime.timedelta(minutes=5)
95+
return dateob_5
96+
except IOError:
97+
current = datetime.datetime.now()
98+
file = open('/home/flowmon/kafka/last', 'w+')
99+
dateob_5 = current - datetime.timedelta(minutes=5)
100+
rounded = roundDownDateTime(dateob_5)
101+
str_time = rounded.strftime("%Y%m%d%H%M")
102+
file.write(str_time)
103+
file.close()
104+
return rounded
105+
def kafka_stream(args, records):
106+
producer = KafkaProducer(bootstrap_servers = f"{args['host']}:{args['port']}",
107+
value_serializer=lambda m: cbor.dumps(m))
108+
109+
for record in records:
110+
stream = producer.send(args['topic'],record)
111+
stream.add_callback(on_success)
112+
stream.add_errback(on_error)
113+
114+
producer.flush()
115+
producer.close()
116+
117+
def main():
118+
logging.info('------- New run -------')
119+
args = parse_arguments()
120+
timestamp = get_timestamp()
121+
str_time = timestamp.strftime("%Y%m%d%H%M")
122+
full_path = timestamp.strftime("%Y/%m/%d/") + "nfcapd." + str_time
123+
logging.debug('Processing {}'.format(full_path))
124+
data = get_data(full_path)
125+
records = process_records(data)
126+
kafka_stream(args, records)
127+
128+
logging.info('Everything is done')
129+
130+
if __name__ == "__main__":
131+
main()

0 commit comments

Comments
 (0)