Skip to content

Commit c475786

Browse files
Merge pull request #242 from Zadamsa/biflow-aggregator-ontime-exports
Biflow aggregator - add global flush
2 parents 74ba22f + d9fde3d commit c475786

22 files changed

+320
-8
lines changed

biflow_aggregator/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@ biflow_aggregator_SOURCES=main.cpp fields.c fields.h configuration.cpp configura
55
rapidxml.hpp
66
biflow_aggregator_LDADD=-lunirec -ltrap
77
include ../aminclude.am
8+
9+
TESTS = tests/test.sh

biflow_aggregator/configuration.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,27 @@ bool Configuration::get_eof_termination() noexcept
7171
return _eof_terminate;
7272
}
7373

74+
void Configuration::set_global_flush_configuration(const char *input)
75+
{
76+
std::size_t mode_start_index;
77+
_global_flush_configuration.interval = std::stoul(input, &mode_start_index);
78+
if (std::strcmp(input + mode_start_index, "a") == 0 ||
79+
std::strcmp(input + mode_start_index, "absolute") == 0) {
80+
_global_flush_configuration.type = Global_flush_configuration::Type::ABSOLUTE;
81+
} else if (std::strcmp(input + mode_start_index, "r") == 0 ||
82+
std::strcmp(input + mode_start_index, "relative") == 0 ||
83+
std::strcmp(input + mode_start_index, "") == 0) {
84+
_global_flush_configuration.type = Global_flush_configuration::Type::RELATIVE;
85+
} else {
86+
throw std::invalid_argument("Invalid flush timeout format. Expected: <interval> [a|absolute|r|relative|<empty for relative>].");
87+
}
88+
}
89+
90+
Configuration::Global_flush_configuration Configuration::get_global_flush_configuration() noexcept
91+
{
92+
return _global_flush_configuration;
93+
}
94+
7495
void Configuration::print() noexcept
7596
{
7697
std::cout << "***** Configuration *****" << std::endl;

biflow_aggregator/configuration.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,35 @@
2323
* @brief Class thas holds module configuration
2424
*/
2525
class Configuration {
26+
public:
27+
28+
/**
29+
* @brief Global flush configuration
30+
*
31+
* Flush interval is used to flush all records in flow cache to output interface once per given amount of seconds.
32+
* If not set, no flush is performed.
33+
*/
34+
struct Global_flush_configuration {
35+
enum class Type {
36+
ABSOLUTE, ///< Flows must be flushed every interval seconds starting from epoch
37+
RELATIVE, ///< Flows must be flushed every interval seconds starting from module start
38+
} type;
39+
time_t interval = 0; ///< Interval in seconds
40+
41+
/**
42+
* @brief Check if flush interval is set
43+
*
44+
* @return true Flush interval is set
45+
* @return false Flush interval is not set
46+
*/
47+
[[nodiscard]] inline
48+
bool is_set() const noexcept
49+
{
50+
return interval > 0;
51+
}
52+
};
53+
54+
private:
2655

2756
/**
2857
* @brief Configuration of fields from config file.
@@ -45,6 +74,14 @@ class Configuration {
4574
*/
4675
time_t _t_passive;
4776

77+
/**
78+
* @brief Periodic flush configuration
79+
*
80+
* If set, module flush all records in flow cache to output interface once per given amount of seconds.
81+
* If flush interval is set to 0, no flush is performed.
82+
*/
83+
Global_flush_configuration _global_flush_configuration;
84+
4885
/**
4986
* @brief active timeout
5087
*
@@ -155,6 +192,20 @@ class Configuration {
155192
*/
156193
time_t get_active_timeout() noexcept;
157194

195+
/**
196+
* @brief Set the flush timeout
197+
*
198+
* See _periodic_flush_configuration for more info.
199+
*
200+
* @param input Timeout in text format.
201+
*/
202+
void set_global_flush_configuration(const char *input);
203+
204+
/**
205+
* @brief Get the flush timeout object
206+
*/
207+
Global_flush_configuration get_global_flush_configuration() noexcept;
208+
158209
/**
159210
* @brief Set the passive timeout
160211
*

biflow_aggregator/main.cpp

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ UR_FIELDS (
7979
PARAM('e', "eof", "End when receive EOF.", no_argument, "flag") \
8080
PARAM('s', "size", "Max number of elements in flow cache.", required_argument, "number") \
8181
PARAM('a', "active-timeout", "Active timeout.", required_argument, "number") \
82-
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number")
82+
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") \
83+
PARAM('g', "global-timeout", "Global timeout.", required_argument, "number")
8384

8485
trap_module_info_t *module_info = NULL;
8586
static volatile int stop = 0;
@@ -364,6 +365,20 @@ void update_flow(
364365
if (pt != t_data->value.passive_timeout)
365366
dll.swap(t_data);
366367
}
368+
369+
static void flush_all(agg::Aggregator<agg::FlowKey>& aggregator,
370+
ur_template_t* out_template, void* out_record, Dll<agg::Timeout_data>& dll)
371+
{
372+
for (auto flow_data : aggregator.flow_cache) {
373+
proccess_and_send(aggregator, flow_data.first, flow_data.second, out_template, out_record);
374+
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
375+
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
376+
}
377+
dll.clear();
378+
aggregator.flow_cache.clear();
379+
trap_send_flush(0);
380+
}
381+
367382
static int
368383
do_mainloop(Configuration& config)
369384
{
@@ -383,8 +398,11 @@ do_mainloop(Configuration& config)
383398

384399
time_t time_first;
385400
time_t time_last = 0;
401+
time_t last_flush_time = 0;
386402
time_t t_passive = config.get_passive_timeout() >> 32;
387403
time_t t_active = config.get_active_timeout() >> 32;
404+
const Configuration::Global_flush_configuration& flush_configuration
405+
= config.get_global_flush_configuration();
388406
std::size_t flow_cnt = 0;
389407
Dll<agg::Timeout_data> dll;
390408

@@ -432,13 +450,7 @@ do_mainloop(Configuration& config)
432450

433451
// clear all memory
434452
// flush all flows
435-
for (auto flow_data : agg.flow_cache) {
436-
proccess_and_send(agg, flow_data.first, flow_data.second, out_tmplt, out_rec);
437-
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
438-
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
439-
}
440-
441-
trap_send_flush(0);
453+
flush_all(agg, out_tmplt, out_rec, dll);
442454

443455
// Free previous record and temlate
444456
ur_free_template(out_tmplt);
@@ -490,6 +502,14 @@ do_mainloop(Configuration& config)
490502
trap_send_flush(0);
491503
timeouted = false;
492504
}
505+
506+
if (unlikely(flush_configuration.is_set() && time_last - last_flush_time >= flush_configuration.interval)) {
507+
last_flush_time = time_last;
508+
if (flush_configuration.type == Configuration::Global_flush_configuration::Type::ABSOLUTE) {
509+
last_flush_time = last_flush_time / flush_configuration.interval * flush_configuration.interval;
510+
}
511+
flush_all(agg, out_tmplt, out_rec, dll);
512+
}
493513

494514
bool is_key_reversed = key.generate(in_data, in_tmplt, config.is_biflow_key());
495515

@@ -662,6 +682,9 @@ main(int argc, char **argv)
662682
case 's':
663683
config.set_flow_cache_size(optarg);
664684
break;
685+
case 'g':
686+
config.set_global_flush_configuration(optarg);
687+
break;
665688
default:
666689
std::cerr << "Invalid argument " << opt << ", skipped..." << std::endl;
667690
}

biflow_aggregator/tests/config.xml

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?xml version="1.0"?>
2+
<aggregator>
3+
<id name="packet_aggregation">
4+
<field>
5+
<type>SUM</type>
6+
<name>PACKETS</name>
7+
</field>
8+
</id>
9+
<id name="generic_flow_key_min_ports">
10+
<field>
11+
<name>FLOW_ID</name>
12+
<type>KEY</type>
13+
</field>
14+
15+
<field>
16+
<name>SRC_PORT</name>
17+
<type>MIN</type>
18+
</field>
19+
<field>
20+
<name>DST_PORT</name>
21+
<type>MIN</type>
22+
</field>
23+
</id>
24+
<id name="src_dst_ip_all_aggregations">
25+
<field>
26+
<name>SRC_IP</name>
27+
<type>KEY</type>
28+
</field>
29+
<field>
30+
<name>DST_IP</name>
31+
<type>KEY</type>
32+
</field>
33+
34+
<field>
35+
<name>SUM</name>
36+
<type>SUM</type>
37+
</field>
38+
<field>
39+
<name>MIN</name>
40+
<type>MIN</type>
41+
</field>
42+
<field>
43+
<name>MAX</name>
44+
<type>MAX</type>
45+
</field>
46+
<field>
47+
<name>FIRST_NON_EMPTY</name>
48+
<type>FIRST_NON_EMPTY</type>
49+
</field>
50+
<field>
51+
<name>LAST_NON_EMPTY</name>
52+
<type>LAST_NON_EMPTY</type>
53+
</field>
54+
<field>
55+
<name>FIRST</name>
56+
<type>FIRST</type>
57+
</field>
58+
<field>
59+
<name>LAST</name>
60+
<type>LAST</type>
61+
</field>
62+
<field>
63+
<name>AVG</name>
64+
<type>AVG</type>
65+
</field>
66+
<field>
67+
<name>BITOR</name>
68+
<type>BITOR</type>
69+
</field>
70+
<field>
71+
<name>STR_APPEND</name>
72+
<type>APPEND</type>
73+
<delimiter>:</delimiter>
74+
<size>10</size>
75+
</field>
76+
<field>
77+
<name>SORTED_MERGE_VALUE</name>
78+
<type>SORTED_MERGE</type>
79+
<delimiter>:</delimiter>
80+
<sort_key>SORTED_MERGE_KEY</sort_key>
81+
<sort_type>ASCENDING</sort_type>
82+
<size>10</size>
83+
</field>
84+
</id>
85+
</aggregator>
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST
2+
192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0
3+
192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST
2+
192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0
3+
192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0
4+
192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0
5+
192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0
6+
192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:41.0
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST,uint32 FLOW_ID,uint16 SRC_PORT,uint16 DST_PORT
2+
192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,1,0,6666
3+
192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0,1,6666,0
4+
192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0,2,6666,0
5+
192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0,2,3333,3333
6+
192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:41.0,2,0,6666
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ipaddr DST_IP,ipaddr SRC_IP,time TIME_FIRST,time TIME_LAST,uint32 SUM,uint32 MIN,uint32 MAX,string FIRST_NON_EMPTY,string LAST_NON_EMPTY,uint32 FIRST,uint32 LAST,double AVG,string STR_APPEND,uint32 BITOR,uint32* SORTED_MERGE_KEY,uint32* SORTED_MERGE_VALUE
2+
192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,333,5,5,,222,16,32,7,test1,1,[3|2|1],[1|2|3]
3+
192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:11.0,333,1,4,test,,555,33,9,test2,3,[8|7|6],[6|5|4]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667

0 commit comments

Comments
 (0)