diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index a2b87a44a5017..17f3d5f1e6a36 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -257,6 +257,7 @@ following works: - go.uber.org/atomic [MIT License](https://pkg.go.dev/go.uber.org/atomic?tab=licenses) - go.uber.org/multierr [MIT License](https://pkg.go.dev/go.uber.org/multierr?tab=licenses) - golang.org/x/crypto [BSD 3-Clause Clear License](https://github.com/golang/crypto/blob/master/LICENSE) +- golang.org/x/exp [BSD 3-Clause Clear License](https://github.com/golang/exp/blob/master/LICENSE) - golang.org/x/net [BSD 3-Clause Clear License](https://github.com/golang/net/blob/master/LICENSE) - golang.org/x/oauth2 [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/oauth2/blob/master/LICENSE) - golang.org/x/sync [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/sync/blob/master/LICENSE) @@ -267,6 +268,7 @@ following works: - golang.org/x/xerrors [BSD 3-Clause Clear License](https://github.com/golang/xerrors/blob/master/LICENSE) - golang.zx2c4.com/wireguard [MIT License](https://github.com/WireGuard/wgctrl-go/blob/master/LICENSE.md) - golang.zx2c4.com/wireguard/wgctrl [MIT License](https://github.com/WireGuard/wgctrl-go/blob/master/LICENSE.md) +- gonum.org/v1/gonum [BSD 3-Clause "New" or "Revised" License](https://github.com/gonum/gonum/blob/master/LICENSE) - google.golang.org/api [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/google-api-go-client/blob/master/LICENSE) - google.golang.org/genproto [Apache License 2.0](https://github.com/google/go-genproto/blob/master/LICENSE) - google.golang.org/grpc [Apache License 2.0](https://github.com/grpc/grpc-go/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 2388e17df9646..682ce628df416 100644 --- a/go.mod +++ b/go.mod @@ -145,6 +145,7 @@ require ( golang.org/x/sys v0.0.0-20211214234402-4825e8c3871d golang.org/x/text v0.3.7 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20211230205640-daad0b7ba671 + gonum.org/v1/gonum v0.9.3 google.golang.org/api v0.54.0 google.golang.org/genproto v0.0.0-20210827211047-25e5f791fe06 google.golang.org/grpc v1.41.0 @@ -330,6 +331,7 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.0.0-20211202192323-5770296d904e // indirect + golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect golang.org/x/mod v0.5.1 // indirect golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect @@ -357,6 +359,7 @@ require ( modernc.org/token v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect sigs.k8s.io/yaml v1.2.0 // indirect + ) // replaced due to https://github.com/satori/go.uuid/issues/73 diff --git a/go.sum b/go.sum index 696b90b901e78..5aa62c8578085 100644 --- a/go.sum +++ b/go.sum @@ -2794,6 +2794,7 @@ gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/gonum v0.9.3 h1:DnoIG+QAMaF5NvxnGe/oKsgKcAc6PcUyl8q0VetfQ8s= gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= gonum.org/v1/plot v0.9.0/go.mod h1:3Pcqqmp6RHvJI72kgb8fThyUnav364FOsdDo2aGW5lY= diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index faf6de1e25661..868b3f419a62c 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -12,6 +12,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/execd" _ "github.com/influxdata/telegraf/plugins/processors/filepath" _ "github.com/influxdata/telegraf/plugins/processors/ifname" + _ "github.com/influxdata/telegraf/plugins/processors/noise" _ "github.com/influxdata/telegraf/plugins/processors/override" _ "github.com/influxdata/telegraf/plugins/processors/parser" _ "github.com/influxdata/telegraf/plugins/processors/pivot" diff --git a/plugins/processors/noise/README.md b/plugins/processors/noise/README.md new file mode 100644 index 0000000000000..f503a9a10f56d --- /dev/null +++ b/plugins/processors/noise/README.md @@ -0,0 +1,79 @@ +# Noise Processor + +The *Noise* processor is used to add noise to numerical field values. For each field a noise is generated using a defined probability densitiy function and added to the value. The function type can be configured as _Laplace_, _Gaussian_ or _Uniform_. +Depending on the function, various parameters need to be configured: + +## Configuration + +Depending on the choice of the distribution function, the respective parameters must be set. Default settings are `noise_type = "laplacian"` with `mu = 0.0` and `scale = 1.0`: + +```toml +[[processors.noise]] + ## Specified the type of the random distribution. + ## Can be "laplacian", "gaussian" or "uniform". + # type = "laplacian + + ## Center of the distribution. + ## Only used for Laplacian and Gaussian distributions. + # mu = 0.0 + + ## Scale parameter for the Laplacian or Gaussian distribution + # scale = 1.0 + + ## Upper and lower bound of the Uniform distribution + # min = -1.0 + # max = 1.0 + + ## Apply the noise only to numeric fields matching the filter criteria below. + ## Excludes takes precedence over includes. + # include_fields = [] + # exclude_fields = [] +``` + +Using the `include_fields` and `exclude_fields` options a filter can be configured to apply noise only to numeric fields matching it. +The following distribution functions are available. + +### Laplacian + +* `noise_type = laplacian` +* `scale`: also referred to as _diversity_ parameter, regulates the width & height of the function, a bigger `scale` value means a higher probability of larger noise, default set to 1.0 +* `mu`: location of the curve, default set to 0.0 + +### Gaussian + +* `noise_type = gaussian` +* `mu`: mean value, default set to 0.0 +* `scale`: standard deviation, default set to 1.0 + +### Uniform + +* `noise_type = uniform` +* `min`: minimal interval value, default set to -1.0 +* `max`: maximal interval value, default set to 1.0 + +## Example + +Add noise to each value the *Inputs.CPU* plugin generates, except for the _usage\_steal_, _usage\_user_, _uptime\_format_, _usage\_idle_ field and all fields of the metrics _swap_, _disk_ and _net_: + +```toml +[[inputs.cpu]] + percpu = true + totalcpu = true + collect_cpu_time = false + report_active = false + +[[processors.noise]] + scale = 1.0 + mu = 0.0 + noise_type = "laplacian" + include_fields = [] + exclude_fields = ["usage_steal", "usage_user", "uptime_format", "usage_idle" ] + namedrop = ["swap", "disk", "net"] +``` + +Result of noise added to the _cpu_ metric: + +```diff +- cpu map[cpu:cpu11 host:98d5b8dbad1c] map[usage_guest:0 usage_guest_nice:0 usage_idle:94.3999999994412 usage_iowait:0 usage_irq:0.1999999999998181 usage_nice:0 usage_softirq:0.20000000000209184 usage_steal:0 usage_system:1.2000000000080036 usage_user:4.000000000014552] ++ cpu map[cpu:cpu11 host:98d5b8dbad1c] map[usage_guest:1.0078071583066057 usage_guest_nice:0.523063861602435 usage_idle:95.53920223476884 usage_iowait:0.5162661526251292 usage_irq:0.7138529816101375 usage_nice:0.6119678488887954 usage_softirq:0.5573585443688622 usage_steal:0.2006120911289802 usage_system:1.2954475820198437 usage_user:6.885664792615023] +``` diff --git a/plugins/processors/noise/noise.go b/plugins/processors/noise/noise.go new file mode 100644 index 0000000000000..281b501460a4f --- /dev/null +++ b/plugins/processors/noise/noise.go @@ -0,0 +1,156 @@ +package noise + +import ( + "fmt" + "math" + "reflect" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/plugins/processors" + "gonum.org/v1/gonum/stat/distuv" +) + +const ( + defaultScale = 1.0 + defaultMin = -1.0 + defaultMax = 1.0 + defaultMu = 0.0 + defaultNoiseType = "laplacian" +) + +const sampleConfig = ` + ## Specified the type of the random distribution. + ## Can be "laplacian", "gaussian" or "uniform". + # type = "laplacian + + ## Center of the distribution. + ## Only used for Laplacian and Gaussian distributions. + # mu = 0.0 + + ## Scale parameter for the Laplacian or Gaussian distribution + # scale = 1.0 + + ## Upper and lower bound of the Uniform distribution + # min = -1.0 + # max = 1.0 + + ## Apply the noise only to numeric fields matching the filter criteria below. + ## Excludes takes precedence over includes. + # include_fields = [] + # exclude_fields = [] +` + +type Noise struct { + Scale float64 `toml:"scale"` + Min float64 `toml:"min"` + Max float64 `toml:"max"` + Mu float64 `toml:"mu"` + IncludeFields []string `toml:"include_fields"` + ExcludeFields []string `toml:"exclude_fields"` + NoiseType string `toml:"type"` + Log telegraf.Logger `toml:"-"` + generator distuv.Rander + fieldFilter filter.Filter +} + +func (p *Noise) SampleConfig() string { + return sampleConfig +} + +func (p *Noise) Description() string { + return "Adds noise to numerical fields" +} + +// generates a random noise value depending on the defined probability density +// function and adds that to the original value. If any integer overflows +// happen during the calculation, the result is set to MaxInt or 0 (for uint) +func (p *Noise) addNoise(value interface{}) interface{} { + n := p.generator.Rand() + switch v := value.(type) { + case int: + case int8: + case int16: + case int32: + case int64: + if v > 0 && (n > math.Nextafter(float64(math.MaxInt64), 0) || int64(n) > math.MaxInt64-v) { + p.Log.Debug("Int64 overflow, setting value to MaxInt64") + return int64(math.MaxInt64) + } + if v < 0 && (n < math.Nextafter(float64(math.MinInt64), 0) || int64(n) < math.MinInt64-v) { + p.Log.Debug("Int64 (negative) overflow, setting value to MinInt64") + return int64(math.MinInt64) + } + return v + int64(n) + case uint: + case uint8: + case uint16: + case uint32: + case uint64: + if n < 0 { + if uint64(-n) > v { + p.Log.Debug("Uint64 (negative) overflow, setting value to 0") + return uint64(0) + } + return v - uint64(-n) + } + if n > math.Nextafter(float64(math.MaxUint64), 0) || uint64(n) > math.MaxUint64-v { + p.Log.Debug("Uint64 overflow, setting value to MaxUint64") + return uint64(math.MaxUint64) + } + return v + uint64(n) + case float32: + return v + float32(n) + case float64: + return v + n + default: + p.Log.Debugf("Value (%v) type invalid: [%v] is not an int, uint or float", v, reflect.TypeOf(value)) + } + return value +} + +// Creates a filter for Include and Exclude fields and sets the desired noise +// distribution +func (p *Noise) Init() error { + fieldFilter, err := filter.NewIncludeExcludeFilter(p.IncludeFields, p.ExcludeFields) + if err != nil { + return fmt.Errorf("creating fieldFilter failed: %v", err) + } + p.fieldFilter = fieldFilter + + switch p.NoiseType { + case "", "laplacian": + p.generator = &distuv.Laplace{Mu: p.Mu, Scale: p.Scale} + case "uniform": + p.generator = &distuv.Uniform{Min: p.Min, Max: p.Max} + case "gaussian": + p.generator = &distuv.Normal{Mu: p.Mu, Sigma: p.Scale} + default: + return fmt.Errorf("unknown distribution type %q", p.NoiseType) + } + return nil +} + +func (p *Noise) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + for _, metric := range metrics { + for _, field := range metric.FieldList() { + if !p.fieldFilter.Match(field.Key) { + continue + } + field.Value = p.addNoise(field.Value) + } + } + return metrics +} + +func init() { + processors.Add("noise", func() telegraf.Processor { + return &Noise{ + NoiseType: defaultNoiseType, + Mu: defaultMu, + Scale: defaultScale, + Min: defaultMin, + Max: defaultMax, + } + }) +} diff --git a/plugins/processors/noise/noise_test.go b/plugins/processors/noise/noise_test.go new file mode 100644 index 0000000000000..f491d1f09876c --- /dev/null +++ b/plugins/processors/noise/noise_test.go @@ -0,0 +1,378 @@ +package noise + +import ( + "math" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "gonum.org/v1/gonum/stat/distuv" +) + +type testDistribution struct { + value float64 +} + +func (t *testDistribution) Rand() float64 { + return t.value +} + +// Verifies that field values are modified by the Laplace noise +func TestAddNoiseToMetric(t *testing.T) { + generators := []string{"laplacian", "gaussian", "uniform"} + for _, generator := range generators { + p := Noise{ + NoiseType: generator, + Scale: 1.0, + Mu: 0.0, + Min: -1, + Max: 1, + Log: testutil.Logger{}, + } + require.NoError(t, p.Init()) + for _, m := range testutil.MockMetrics() { + after := p.Apply(m.Copy()) + require.Len(t, after, 1) + require.NotEqual(t, m, after[0]) + } + } +} + +// Verifies that a given noise is added correctly to values +func TestAddNoise(t *testing.T) { + tests := []struct { + name string + input []telegraf.Metric + expected []telegraf.Metric + distribution distuv.Rander + }{ + { + name: "int64", + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": int64(5)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": int64(-10)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": int64(4)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": int64(-11)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: -1.5}, + }, + { + name: "uint64", + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": uint64(25)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": uint64(0)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": uint64(26)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": uint64(1)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: 1.5}, + }, + { + name: "float64", + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(0.0005)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(1000.5)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(5.0005)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(1005.5)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: 5.0}, + }, + { + name: "float64", + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(0.0005)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(1000.5)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(-0.4995)}, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": float64(1000)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: -0.5}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := Noise{ + NoiseType: "laplacian", + Scale: 1.0, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.generator = tt.distribution + + actual := plugin.Apply(tt.input...) + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} + +// Tests that int64 & uint64 overflow errors are catched +func TestAddNoiseOverflowCheck(t *testing.T) { + tests := []struct { + name string + input []telegraf.Metric + expected []telegraf.Metric + distribution distuv.Rander + }{ + { + name: "underflow", + input: []telegraf.Metric{ + testutil.MustMetric("underflow_int64", + map[string]string{}, + map[string]interface{}{"value": math.MinInt64}, + time.Unix(0, 0), + ), + testutil.MustMetric("underflow_uint64_1", + map[string]string{}, + map[string]interface{}{"value": uint64(5)}, + time.Unix(0, 0), + ), + testutil.MustMetric("underflow_uint64_2", + map[string]string{}, + map[string]interface{}{"value": uint64(0)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("underflow_int64", + map[string]string{}, + map[string]interface{}{"value": math.MinInt64}, + time.Unix(0, 0), + ), + testutil.MustMetric("underflow_uint64_1", + map[string]string{}, + map[string]interface{}{"value": uint64(4)}, + time.Unix(0, 0), + ), + testutil.MustMetric("underflow_uint64_2", + map[string]string{}, + map[string]interface{}{"value": uint64(0)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: -1.0}, + }, + { + name: "overflow", + input: []telegraf.Metric{ + testutil.MustMetric("overflow_int64", + map[string]string{}, + map[string]interface{}{"value": math.MaxInt64}, + time.Unix(0, 0), + ), + testutil.MustMetric("overflow_uint", + map[string]string{}, + map[string]interface{}{"value": uint(math.MaxUint)}, + time.Unix(0, 0), + ), + testutil.MustMetric("overflow_uint64", + map[string]string{}, + map[string]interface{}{"value": uint64(math.MaxUint64)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("overflow_int64", + map[string]string{}, + map[string]interface{}{"value": math.MaxInt64}, + time.Unix(0, 0), + ), + testutil.MustMetric("overflow_uint", + map[string]string{}, + map[string]interface{}{"value": uint(math.MaxUint)}, + time.Unix(0, 0), + ), + testutil.MustMetric("overflow_uint64", + map[string]string{}, + map[string]interface{}{"value": uint64(math.MaxUint64)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: 1.0}, + }, + { + name: "non-numeric fields", + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "test", + "b": true, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "test", + "b": true, + }, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: 1.0}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := Noise{ + NoiseType: "laplacian", + Scale: 1.0, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.generator = tt.distribution + + actual := plugin.Apply(tt.input...) + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} + +// Verifies that even addNoise() modifies 0 values as well +func TestAddNoiseWithZeroValue(t *testing.T) { + tests := []struct { + name string + input []telegraf.Metric + expected []telegraf.Metric + distribution distuv.Rander + }{ + { + name: "zeros", + input: []telegraf.Metric{ + testutil.MustMetric("zero_uint64", + map[string]string{}, + map[string]interface{}{"value": uint64(0)}, + time.Unix(0, 0), + ), + testutil.MustMetric("zero_int64", + map[string]string{}, + map[string]interface{}{"value": int64(0)}, + time.Unix(0, 0), + ), + testutil.MustMetric("zero_float", + map[string]string{}, + map[string]interface{}{"value": float64(0.0)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("zero_uint64", + map[string]string{}, + map[string]interface{}{"value": uint64(13)}, + time.Unix(0, 0), + ), + testutil.MustMetric("zero_int64", + map[string]string{}, + map[string]interface{}{"value": int64(13)}, + time.Unix(0, 0), + ), + testutil.MustMetric("zero_float", + map[string]string{}, + map[string]interface{}{"value": float64(13.37)}, + time.Unix(0, 0), + ), + }, + distribution: &testDistribution{value: 13.37}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := Noise{ + NoiseType: "laplacian", + Scale: 1.0, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.generator = tt.distribution + + actual := plugin.Apply(tt.input...) + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} + +// Verifies that any invalid generator setting (not "laplacian", "gaussian" or +// "uniform") raises an error +func TestInvalidDistributionFunction(t *testing.T) { + p := Noise{ + NoiseType: "invalid", + Log: testutil.Logger{}, + } + err := p.Init() + require.EqualError(t, err, "unknown distribution type \"invalid\"") +}