Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
ACLOCAL_AMFLAGS = -I m4

# Biflow aggregator depends on logger and logreplay
SUBDIRS= \
aggregator \
anonymizer \
backscatter \
biflow_aggregator \
debug_sender \
device_classifier \
email_reporter \
Expand All @@ -31,6 +31,7 @@ topn \
traffic_repeater \
unirec2json \
endiverter \
biflow_aggregator \
googletest_example

EXTRA_DIST = AUTHORS COPYING ChangeLog INSTALL NEWS README.md nfreader \
Expand Down
7 changes: 7 additions & 0 deletions biflow_aggregator/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,10 @@ biflow_aggregator_SOURCES=main.cpp fields.c fields.h configuration.cpp configura
rapidxml.hpp
biflow_aggregator_LDADD=-lunirec -ltrap
include ../aminclude.am

TESTS = tests/test.sh

EXTRA_DIST = tests/test.sh \
tests/references \
tests/inputs \
tests/config.xml
40 changes: 38 additions & 2 deletions biflow_aggregator/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ int Field_template::assign_append() noexcept
return 0;
}

template<typename T>
int Field_template::assign_unique_count() noexcept
{
typename_size = sizeof(T);
ag_fnc = unique_count<T>;
post_proc_fnc = Unique_count_data<T>::postprocessing;
init_fnc = Unique_count_data<T>::init;
deinit_fnc = Unique_count_data<T>::deinit;
ag_data_size = sizeof(Unique_count_data<T>);
return 0;
}

template<Field_type ag_type, typename T>
int Field_template::assign_min_max() noexcept
{
Expand Down Expand Up @@ -431,6 +443,23 @@ int Field_template::set_templates(const Field_type ag_type, const ur_field_type_
std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to APPEND function." << std::endl;
return 1;
}
case UNIQUE_COUNT:
switch (ur_f_type) {
case UR_TYPE_UINT8: return assign_unique_count<uint8_t>();
case UR_TYPE_INT8: return assign_unique_count<int8_t>();
case UR_TYPE_UINT16: return assign_unique_count<uint16_t>();
case UR_TYPE_INT16: return assign_unique_count<int16_t>();
case UR_TYPE_UINT32: return assign_unique_count<uint32_t>();
case UR_TYPE_INT32: return assign_unique_count<int32_t>();
case UR_TYPE_UINT64: return assign_unique_count<uint64_t>();
case UR_TYPE_INT64: return assign_unique_count<int64_t>();
case UR_TYPE_MAC: return assign_unique_count<Mac_addr>();
case UR_TYPE_IP: return assign_unique_count<uint128_t>();
case UR_TYPE_STRING: return assign_unique_count<char>();
default:
std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to UNIQUE_COUNT function." << std::endl;
return 1;
}
default:
assert("Invalid case option.\n");
return 1;
Expand Down Expand Up @@ -732,8 +761,10 @@ int Field_template::set_templates_dir(const ur_field_type_t ur_f_type, const ur_
}
}

Field::Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid) :
ur_fid(ur_fid), ur_r_fid(ur_r_fid), ur_sort_key_id(0), ur_sort_key_type()
Field::Field(const Field_config cfg, const ur_field_id_t ur_fid_in, const ur_field_id_t ur_r_fid_in,
const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out)
: ur_fid(ur_fid_in), ur_r_fid(ur_r_fid_in), ur_fid_out(ur_fid_out),
ur_r_fid_out(ur_r_fid_out), ur_sort_key_id(0), ur_sort_key_type()
{
ur_field_type_t ur_field_type = ur_get_type(ur_fid);

Expand Down Expand Up @@ -813,6 +844,11 @@ void Fields::init(uint8_t *memory)
data.first.init(memory, &cfg);
break;
}
case UNIQUE_COUNT: {
struct Config_unique_count cfg = {data.first.limit};
data.first.init(memory, &cfg);
break;
}
case SORTED_MERGE:
case SORTED_MERGE_DIR: {
struct Config_sorted_merge cfg = {data.first.limit, data.first.delimiter, data.first.sort_type};
Expand Down
21 changes: 19 additions & 2 deletions biflow_aggregator/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum Field_type {
LAST,
LAST_NON_EMPTY,
APPEND,
UNIQUE_COUNT,
SORTED_MERGE,
SORTED_MERGE_DIR,
INVALID_TYPE,
Expand Down Expand Up @@ -95,6 +96,9 @@ class Field_template {
template<typename T>
int assign_append() noexcept;

template<typename T>
int assign_unique_count() noexcept;

template<Field_type ag_type, typename T>
int assign_min_max() noexcept;

Expand Down Expand Up @@ -275,7 +279,7 @@ struct Field_config {
char delimiter;

/**
* @brief Max size of append and sortd merge data
* @brief Max size of data
*/
std::size_t limit;

Expand All @@ -301,6 +305,16 @@ class Field : public Field_config, public Field_template {
*/
ur_field_id_t ur_r_fid;

/**
* @brief ID of output unirec field
*/
ur_field_id_t ur_fid_out;

/**
* @brief Reverse ID of output unirec field
*/
ur_field_id_t ur_r_fid_out;

/**
* @brief ID of sort key unirec field
*
Expand All @@ -321,8 +335,11 @@ class Field : public Field_config, public Field_template {
* @param cfg Field configuration
* @param ur_fid Field ID
* @param ur_r_fid Reverse field ID
* @param ur_fid_out Output field ID
* @param ur_r_fid_out Reverse output field ID
*/
Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid);
Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid,
const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out);

/**
* @brief Call field init function.
Expand Down
71 changes: 71 additions & 0 deletions biflow_aggregator/aggregator_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <iostream>
#include <limits>
#include <nemea-common/BloomFilter.hpp>

#include <unirec/unirec.h>

Expand Down Expand Up @@ -174,6 +175,50 @@ struct Append_data : Config_append {
}
};

/**
* @brief Configuration for unique count function
*/
struct Config_unique_count {
std::size_t filter_size;
};

/**
* @brief Structure used to store data for unique count function.
*/
template<typename T>
struct Unique_count_data : Config_unique_count {
bloom_filter filter;
std::size_t unique_count;

Unique_count_data(const bloom_parameters& parameters)
: filter(parameters)
{}

static inline void init(void* memory, const void* config)
{
const auto* config_unique_count = static_cast<const Config_unique_count*>(config);
bloom_parameters parameters;
parameters.projected_element_count = config_unique_count->filter_size;
parameters.false_positive_probability = 0.01;
parameters.compute_optimal_parameters();
new(memory) Unique_count_data<T>(parameters);
}

static inline void deinit(void* memory)
{
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(memory);
unique_count_data->unique_count = 0;
unique_count_data->~Unique_count_data<T>();
}

static inline const void* postprocessing(void* memory, std::size_t& elem_cnt) noexcept
{
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(memory);
elem_cnt = 1;
return static_cast<void*>(&unique_count_data->unique_count);
}
};

/**
* @brief Configuration to sorted merge function
*/
Expand Down Expand Up @@ -519,6 +564,32 @@ inline void append(const void *src, void *dst) noexcept
static_cast<const T*>(src_data->ptr_first) + src_data->cnt_elements);
}

/**
* @brief Inserts element from src pointer into bloom filter.
* @tparam T template type variable.
* @param [in] src pointer to source of new data.
* @param [in,out] dst pointer to already stored data which will be updated (modified).
*/
template<typename T>
inline void unique_count(const void* src, void* dst) noexcept
{
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(dst);

if (std::is_same<T, char>::value) {
const ur_array_data* src_data = (static_cast<const ur_array_data*>(src));
if (src_data->cnt_elements != 0 && !unique_count_data->filter.contains(
static_cast<const unsigned char*>(src_data->ptr_first), src_data->cnt_elements)) {
unique_count_data->filter.insert(static_cast<const unsigned char*>(src_data->ptr_first), src_data->cnt_elements);
unique_count_data->unique_count++;
}
return;
}
if (!unique_count_data->filter.contains(static_cast<const unsigned char*>(src), sizeof(T))) {
unique_count_data->filter.insert(static_cast<const unsigned char*>(src), sizeof(T));
unique_count_data->unique_count++;
}
}

template <typename T, typename K>
inline void sorted_merge(const void *src, void *dst) noexcept
{
Expand Down
22 changes: 22 additions & 0 deletions biflow_aggregator/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ bool Configuration::get_eof_termination() noexcept
return _eof_terminate;
}

void Configuration::set_global_flush_configuration(const char *input)
{
std::size_t mode_start_index;
_global_flush_configuration.interval = std::stoul(input, &mode_start_index);
if (std::strcmp(input + mode_start_index, "a") == 0 ||
std::strcmp(input + mode_start_index, "absolute") == 0) {
_global_flush_configuration.type = Global_flush_configuration::Type::ABSOLUTE;
} else if (std::strcmp(input + mode_start_index, "r") == 0 ||
std::strcmp(input + mode_start_index, "relative") == 0 ||
std::strcmp(input + mode_start_index, "") == 0) {
_global_flush_configuration.type = Global_flush_configuration::Type::RELATIVE;
} else {
throw std::invalid_argument("Invalid flush timeout format. Expected: <interval> [a|absolute|r|relative|<empty for relative>].");
}
}

Configuration::Global_flush_configuration Configuration::get_global_flush_configuration() noexcept
{
return _global_flush_configuration;
}

void Configuration::print() noexcept
{
std::cout << "***** Configuration *****" << std::endl;
Expand Down Expand Up @@ -107,6 +128,7 @@ agg::Field_type Configuration::get_field_type(const char *input)
if (!std::strcmp(input, "BITAND")) return agg::BIT_AND;
if (!std::strcmp(input, "BITOR")) return agg::BIT_OR;
if (!std::strcmp(input, "APPEND")) return agg::APPEND;
if (!std::strcmp(input, "UNIQUE_COUNT")) return agg::UNIQUE_COUNT;
if (!std::strcmp(input, "SORTED_MERGE")) return agg::SORTED_MERGE;
if (!std::strcmp(input, "SORTED_MERGE_DIR")) return agg::SORTED_MERGE_DIR;
std::cerr << "Invalid type field. Given: " << input << ", Expected: KEY|SUM|MIN|MAX|AVG|FIRST|FIRST_NON_EMPTY|LAST|LAST_NON_EMPTY|BITAND|BITOR|APPEND|SORTED_MERGE|SORTED_MERGE_DIR." << std::endl;
Expand Down
51 changes: 51 additions & 0 deletions biflow_aggregator/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,35 @@
* @brief Class thas holds module configuration
*/
class Configuration {
public:

/**
* @brief Global flush configuration
*
* Flush interval is used to flush all records in flow cache to output interface once per given amount of seconds.
* If not set, no flush is performed.
*/
struct Global_flush_configuration {
enum class Type {
ABSOLUTE, ///< Flows must be flushed every interval seconds starting from epoch
RELATIVE, ///< Flows must be flushed every interval seconds starting from module start
} type;
time_t interval = 0; ///< Interval in seconds

/**
* @brief Check if flush interval is set
*
* @return true Flush interval is set
* @return false Flush interval is not set
*/
[[nodiscard]] inline
bool is_set() const noexcept
{
return interval > 0;
}
};

private:

/**
* @brief Configuration of fields from config file.
Expand All @@ -45,6 +74,14 @@ class Configuration {
*/
time_t _t_passive;

/**
* @brief Periodic flush configuration
*
* If set, module flush all records in flow cache to output interface once per given amount of seconds.
* If flush interval is set to 0, no flush is performed.
*/
Global_flush_configuration _global_flush_configuration;

/**
* @brief active timeout
*
Expand Down Expand Up @@ -155,6 +192,20 @@ class Configuration {
*/
time_t get_active_timeout() noexcept;

/**
* @brief Set the flush timeout
*
* See _periodic_flush_configuration for more info.
*
* @param input Timeout in text format.
*/
void set_global_flush_configuration(const char *input);

/**
* @brief Get the flush timeout object
*/
Global_flush_configuration get_global_flush_configuration() noexcept;

/**
* @brief Set the passive timeout
*
Expand Down
Loading
Loading