Skip to content

Commit

Permalink
feat(outputs): Add rate-limiting infrastructure (influxdata#16258)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Dec 6, 2024
1 parent 304ab2e commit be2d5ef
Show file tree
Hide file tree
Showing 7 changed files with 924 additions and 21 deletions.
6 changes: 5 additions & 1 deletion internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package internal

import "errors"

var ErrNotConnected = errors.New("not connected")
var (
ErrNotConnected = errors.New("not connected")
ErrSerialization = errors.New("serialization of metric(s) failed")
ErrSizeLimitReached = errors.New("size limit reached")
)

// StartupError indicates an error that occurred during startup of a plugin
// e.g. due to connectivity issues or resources being not yet available.
Expand Down
227 changes: 207 additions & 20 deletions models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestRunningOutputWriteFail(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 4, 12)

// Fill buffer to limit twice
Expand All @@ -264,7 +264,7 @@ func TestRunningOutputWriteFail(t *testing.T) {
// no successful flush yet
require.Empty(t, m.Metrics())

m.failWrite = false
m.batchAcceptSize = 0
err = ro.Write()
require.NoError(t, err)

Expand All @@ -277,7 +277,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 100, 1000)

// add 5 metrics
Expand All @@ -293,7 +293,8 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
// no successful flush yet
require.Empty(t, m.Metrics())

m.failWrite = false
m.batchAcceptSize = 0

// add 5 more metrics
for _, metric := range next5 {
ro.AddMetric(metric)
Expand All @@ -314,7 +315,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 5, 100)

// add 5 metrics
Expand Down Expand Up @@ -357,7 +358,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
// no successful flush yet
require.Empty(t, m.Metrics())

m.failWrite = false
m.batchAcceptSize = 0
err = ro.Write()
require.NoError(t, err)

Expand All @@ -377,7 +378,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 5, 1000)

// add 5 metrics
Expand All @@ -399,7 +400,8 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
require.Error(t, err)

// unset fail and write metrics
m.failWrite = false
m.batchAcceptSize = 0

err = ro.Write()
require.NoError(t, err)

Expand Down Expand Up @@ -620,7 +622,7 @@ func TestRunningOutputNonRetryableStartupBehaviorDefault(t *testing.T) {
}
}

func TestRunningOutputUntypedtartupBehaviorIgnore(t *testing.T) {
func TestRunningOutputUntypedStartupBehaviorIgnore(t *testing.T) {
serr := errors.New("untyped err")

for _, behavior := range []string{"", "error", "retry", "ignore"} {
Expand Down Expand Up @@ -692,12 +694,181 @@ func TestRunningOutputPartiallyStarted(t *testing.T) {
require.Equal(t, 3, mo.writes)
}

func TestRunningOutputWritePartialSuccess(t *testing.T) {
plugin := &mockOutput{
batchAcceptSize: 4,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 4)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 8)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.Write())
testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics)
require.Zero(t, model.buffer.Len())
}

func TestRunningOutputWritePartialSuccessAndLoss(t *testing.T) {
lost := 0
plugin := &mockOutput{
batchAcceptSize: 4,
metricFatalIndex: &lost,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}
expected := []telegraf.Metric{
/* fatal, */ first5[1], first5[2], first5[3],
/* fatal, */ next5[0], next5[1], next5[2],
next5[3], next5[4],
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 3)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 6)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.Write())
testutil.RequireMetricsEqual(t, expected, plugin.metrics)
require.Zero(t, model.buffer.Len())
}

func TestRunningOutputWriteBatchPartialSuccess(t *testing.T) {
plugin := &mockOutput{
batchAcceptSize: 4,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 4)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 8)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.WriteBatch())
testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics)
require.Zero(t, model.buffer.Len())
}

func TestRunningOutputWriteBatchPartialSuccessAndLoss(t *testing.T) {
lost := 0
plugin := &mockOutput{
batchAcceptSize: 4,
metricFatalIndex: &lost,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}
expected := []telegraf.Metric{
/* fatal, */ first5[1], first5[2], first5[3],
/* fatal, */ next5[0], next5[1], next5[2],
next5[3], next5[4],
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 3)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 6)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.WriteBatch())
testutil.RequireMetricsEqual(t, expected, plugin.metrics)
require.Zero(t, model.buffer.Len())
}

// Benchmark adding metrics.
func BenchmarkRunningOutputAddWrite(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &perfOutput{}
ro := NewRunningOutput(m, conf, 1000, 10000)

Expand All @@ -712,7 +883,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &perfOutput{}
ro := NewRunningOutput(m, conf, 1000, 10000)

Expand All @@ -729,10 +899,8 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &perfOutput{failWrite: true}
ro := NewRunningOutput(m, conf, 1000, 10000)

for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1"))
}
Expand All @@ -743,9 +911,11 @@ type mockOutput struct {

metrics []telegraf.Metric

// if true, mock write failure
failWrite bool
// Failing output simulation
batchAcceptSize int
metricFatalIndex *int

// Startup error simulation
startupError error
startupErrorCount int
writes int
Expand All @@ -761,11 +931,11 @@ func (m *mockOutput) Connect() error {
return m.startupError
}

func (m *mockOutput) Close() error {
func (*mockOutput) Close() error {
return nil
}

func (m *mockOutput) SampleConfig() string {
func (*mockOutput) SampleConfig() string {
return ""
}

Expand All @@ -774,12 +944,29 @@ func (m *mockOutput) Write(metrics []telegraf.Metric) error {

m.Lock()
defer m.Unlock()
if m.failWrite {

// Simulate a failed write
if m.batchAcceptSize < 0 {
return errors.New("failed write")
}

m.metrics = append(m.metrics, metrics...)
return nil
// Simulate a successful write
if m.batchAcceptSize == 0 || len(metrics) <= m.batchAcceptSize {
m.metrics = append(m.metrics, metrics...)
return nil
}

// Simulate a partially successful write
werr := &internal.PartialWriteError{Err: internal.ErrSizeLimitReached}
for i, x := range metrics {
if m.metricFatalIndex != nil && i == *m.metricFatalIndex {
werr.MetricsReject = append(werr.MetricsReject, i)
} else if i < m.batchAcceptSize {
m.metrics = append(m.metrics, x)
werr.MetricsAccept = append(werr.MetricsAccept, i)
}
}
return werr
}

func (m *mockOutput) Metrics() []telegraf.Metric {
Expand Down
19 changes: 19 additions & 0 deletions plugins/common/ratelimiter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ratelimiter

import (
"time"

"github.com/influxdata/telegraf/config"
)

type RateLimitConfig struct {
Limit config.Size `toml:"rate_limit"`
Period config.Duration `toml:"rate_limit_period"`
}

func (cfg *RateLimitConfig) CreateRateLimiter() *RateLimiter {
return &RateLimiter{
limit: int64(cfg.Limit),
period: time.Duration(cfg.Period),
}
}
Loading

0 comments on commit be2d5ef

Please sign in to comment.