Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new filters allowing FLP-based dedup #640

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add new filters allowing FLP-based dedup
- New "remove_entry_all_satisfied" filter type: entry is removed only if
  all the conditions (represented by nested rules) are satisfied. This
allows to have logical AND in filter conditions, whereas previously it
was only possible to have logical OR

- New "conditional_sampling" filter type: allows to have random sampling
  based on conditions. For example, a flow matching conditions A and B
may have a sampling ratio of 1:10 whereas a flow matching condition C
has 1:100 sampling and all other flows are 1:1

- Introduced a "preprocess" function on rules; currently it's only used
  to be able to cast the `Value interface{}` as an int (otherwise it
comes as a float64); but could be also used in the future for other
purpose, e.g. regex pre-compiling

- Add tests
  • Loading branch information
jotak committed Mar 22, 2024
commit 0289117e2f770fb849c701d9333bb41929f5d8ee
40 changes: 30 additions & 10 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,40 @@ Following is the supported API format for filter transformations:
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied
add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
add_regex_if: add output field if input field satisfies regex pattern from parameters field
add_label: add (input) field to list of labels with value taken from Value field (key=input, value=value)
add_label_if: add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field
conditional_sampling: define conditional sampling rules
removeField: configuration for remove_field rule
input: entry input field
value: specified value of input field:
removeEntryIfExists: configuration for remove_entry_if_exists rule
input: entry input field
value: specified value of input field:
removeEntryIfDoesntExist: configuration for remove_entry_if_doesnt_exist rule
input: entry input field
value: specified value of input field:
removeEntryIfEqual: configuration for remove_entry_if_equal rule
input: entry input field
value: specified value of input field:
removeEntryIfNotEqual: configuration for remove_entry_if_not_equal rule
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
removeEntry: configuration for remove_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
removeEntryAllSatisfied: configuration for remove_entry_all_satisfied rule
type: (enum) one of the following:
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
removeEntry: configuration for remove_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addField: configuration for add_field rule
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addFieldIfDoesntExist: configuration for add_field_if_doesnt_exist rule
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addFieldIf: configuration for add_field_if rule
input: entry input field
output: entry output field
Expand All @@ -199,11 +206,24 @@ Following is the supported API format for filter transformations:
addLabel: configuration for add_label rule
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
addLabelIf: configuration for add_label_if rule
input: entry input field
output: entry output field
parameters: parameters specific to type
assignee: value needs to assign to output field
conditionalSampling: sampling configuration rules
value: sampling value: 1 flow on <sampling> is kept
rules: rules to be satisfied for this sampling configuration
type: (enum) one of the following:
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
removeEntry: configuration for remove_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
</pre>
## Transform Network API
Following is the supported API format for network transformations:
Expand Down
84 changes: 70 additions & 14 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

func (tf *TransformFilter) Preprocess() {
for i := range tf.Rules {
tf.Rules[i].preprocess()
}
}

type TransformFilterEnum string

const (
Expand All @@ -30,32 +36,66 @@ const (
RemoveEntryIfDoesntExist TransformFilterEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist
RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied
AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist
AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field
AddRegExIf TransformFilterEnum = "add_regex_if" // add output field if input field satisfies regex pattern from parameters field
AddLabel TransformFilterEnum = "add_label" // add (input) field to list of labels with value taken from Value field (key=input, value=value)
AddLabelIf TransformFilterEnum = "add_label_if" // add output field to list of labels with value taken from assignee field if input field satisfies criteria from parameters field
ConditionalSampling TransformFilterEnum = "conditional_sampling" // define conditional sampling rules
)

type TransformFilterRemoveEntryEnum string

const (
RemoveEntryIfExistsD TransformFilterRemoveEntryEnum = "remove_entry_if_exists" // removes the entry if the field exists
RemoveEntryIfDoesntExistD TransformFilterRemoveEntryEnum = "remove_entry_if_doesnt_exist" // removes the entry if the field does not exist
RemoveEntryIfEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
)

type TransformFilterRule struct {
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntryIfExists *TransformFilterGenericRule `yaml:"removeEntryIfExists,omitempty" json:"removeEntryIfExists,omitempty" doc:"configuration for remove_entry_if_exists rule"`
RemoveEntryIfDoesntExist *TransformFilterGenericRule `yaml:"removeEntryIfDoesntExist,omitempty" json:"removeEntryIfDoesntExist,omitempty" doc:"configuration for remove_entry_if_doesnt_exist rule"`
RemoveEntryIfEqual *TransformFilterGenericRule `yaml:"removeEntryIfEqual,omitempty" json:"removeEntryIfEqual,omitempty" doc:"configuration for remove_entry_if_equal rule"`
RemoveEntryIfNotEqual *TransformFilterGenericRule `yaml:"removeEntryIfNotEqual,omitempty" json:"removeEntryIfNotEqual,omitempty" doc:"configuration for remove_entry_if_not_equal rule"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"`
AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"`
AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"`
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
AddRegExIf *TransformFilterRuleWithAssignee `yaml:"addRegexIf,omitempty" json:"addRegexIf,omitempty" doc:"configuration for add_regex_if rule"`
AddLabel *TransformFilterGenericRule `yaml:"addLabel,omitempty" json:"addLabel,omitempty" doc:"configuration for add_label rule"`
AddLabelIf *TransformFilterRuleWithAssignee `yaml:"addLabelIf,omitempty" json:"addLabelIf,omitempty" doc:"configuration for add_label_if rule"`
ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"`
}

func (r *TransformFilterRule) preprocess() {
if r.RemoveField != nil {
r.RemoveField.preprocess()
}
if r.RemoveEntry != nil {
r.RemoveEntry.preprocess()
}
for i := range r.RemoveEntryAllSatisfied {
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
}
for i := range r.ConditionalSampling {
r.ConditionalSampling[i].preprocess()
}
}

type TransformFilterGenericRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"`
}

func (r *TransformFilterGenericRule) preprocess() {
if r.CastInt {
if f, ok := r.Value.(float64); ok {
r.Value = int(f)
}
}
}

type TransformFilterRuleWithAssignee struct {
Expand All @@ -64,3 +104,19 @@ type TransformFilterRuleWithAssignee struct {
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
}

type RemoveEntryRule struct {
Type TransformFilterRemoveEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
}

type SamplingCondition struct {
Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
}

func (s *SamplingCondition) preprocess() {
for i := range s.Rules {
s.Rules[i].RemoveEntry.preprocess()
}
}
8 changes: 4 additions & 4 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func TestGRPCPipeline(t *testing.T) {
pl := NewGRPCPipeline("grpc", api.IngestGRPCProto{Port: 9050, BufferLen: 50})
pl = pl.TransformFilter("filter", api.TransformFilter{
Rules: []api.TransformFilterRule{{
Type: "remove_entry_if_doesnt_exist",
RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
Type: "remove_entry_if_doesnt_exist",
RemoveEntry: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
}},
})
pl = pl.WriteStdout("stdout", api.WriteStdout{Format: "json"})
Expand Down Expand Up @@ -110,8 +110,8 @@ func TestKafkaPromPipeline(t *testing.T) {
})
pl = pl.TransformFilter("filter", api.TransformFilter{
Rules: []api.TransformFilterRule{{
Type: "remove_entry_if_doesnt_exist",
RemoveEntryIfDoesntExist: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
Type: "remove_entry_if_doesnt_exist",
RemoveEntry: &api.TransformFilterGenericRule{Input: "doesnt_exist"},
}},
})
pl = pl.ConnTrack("conntrack", api.ConnTrack{
Expand Down
Loading