Skip to content

Commit

Permalink
feat(outputs.influxdb_v2): Add rate limit implementation (influxdata#…
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Dec 6, 2024
1 parent be2d5ef commit 1170985
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 55 deletions.
8 changes: 6 additions & 2 deletions plugins/common/ratelimiter/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ratelimiter

import (
"errors"
"time"

"github.com/influxdata/telegraf/config"
Expand All @@ -11,9 +12,12 @@ type RateLimitConfig struct {
Period config.Duration `toml:"rate_limit_period"`
}

func (cfg *RateLimitConfig) CreateRateLimiter() *RateLimiter {
func (cfg *RateLimitConfig) CreateRateLimiter() (*RateLimiter, error) {
if cfg.Limit > 0 && cfg.Period <= 0 {
return nil, errors.New("invalid period for rate-limit")
}
return &RateLimiter{
limit: int64(cfg.Limit),
period: time.Duration(cfg.Period),
}
}, nil
}
24 changes: 18 additions & 6 deletions plugins/common/ratelimiter/limiters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import (
"github.com/stretchr/testify/require"
)

func TestInvalidPeriod(t *testing.T) {
cfg := &RateLimitConfig{Limit: config.Size(1024)}
_, err := cfg.CreateRateLimiter()
require.ErrorContains(t, err, "invalid period for rate-limit")
}

func TestUnlimited(t *testing.T) {
cfg := &RateLimitConfig{}
limiter := cfg.CreateRateLimiter()
limiter, err := cfg.CreateRateLimiter()
require.NoError(t, err)

start := time.Now()
end := start.Add(30 * time.Minute)
Expand All @@ -24,7 +31,8 @@ func TestUnlimitedWithPeriod(t *testing.T) {
cfg := &RateLimitConfig{
Period: config.Duration(5 * time.Minute),
}
limiter := cfg.CreateRateLimiter()
limiter, err := cfg.CreateRateLimiter()
require.NoError(t, err)

start := time.Now()
end := start.Add(30 * time.Minute)
Expand Down Expand Up @@ -67,7 +75,8 @@ func TestLimited(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name+" at period", func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step)
Expand All @@ -85,7 +94,8 @@ func TestLimited(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step).Add(1 * time.Second)
Expand Down Expand Up @@ -134,7 +144,8 @@ func TestUndo(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name+" at period", func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step)
Expand All @@ -156,7 +167,8 @@ func TestUndo(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step).Add(1 * time.Second)
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/influxdb_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ to use them.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Rate limits for sending data (disabled by default)
## Available, uncompressed payload size e.g. "5Mb"
# rate_limit = "unlimited"
## Fixed time-window for the available payload size e.g. "5m"
# rate_limit_period = "0s"
```

## Metrics
Expand Down
121 changes: 79 additions & 42 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
)

type APIError struct {
Expand Down Expand Up @@ -59,8 +59,9 @@ type httpClient struct {
pingTimeout config.Duration
readIdleTimeout config.Duration
tlsConfig *tls.Config
serializer *influx.Serializer
encoder internal.ContentEncoder
serializer ratelimiter.Serializer
rateLimiter *ratelimiter.RateLimiter
client *http.Client
params url.Values
retryTime time.Time
Expand Down Expand Up @@ -160,52 +161,69 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

batches := make(map[string][]telegraf.Metric)
batchIndices := make(map[string][]int)
if c.bucketTag == "" {
err := c.writeBatch(ctx, c.bucket, metrics)
if err != nil {
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

return err
batches[c.bucket] = metrics
batchIndices[c.bucket] = make([]int, len(metrics))
for i := range metrics {
batchIndices[c.bucket][i] = i
}
} else {
for _, metric := range metrics {
for i, metric := range metrics {
bucket, ok := metric.GetTag(c.bucketTag)
if !ok {
bucket = c.bucket
}

if _, ok := batches[bucket]; !ok {
batches[bucket] = make([]telegraf.Metric, 0)
}

if c.excludeBucketTag {
// Avoid modifying the metric in case we need to retry the request.
} else if c.excludeBucketTag {
// Avoid modifying the metric if we do remove the tag
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.bucketTag)
}

batches[bucket] = append(batches[bucket], metric)
batchIndices[c.bucket] = append(batchIndices[c.bucket], i)
}
}

var wErr internal.PartialWriteError
for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err == nil {
wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket]...)
continue
}

for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err != nil {
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

return err
// Check if the request was too large and split it
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
wErr.Err = err
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
return &wErr
}

// Check if we got a write error and if so, translate the returned
// metric indices to return the original indices in case of bucketing
var writeErr *internal.PartialWriteError
if errors.As(err, &writeErr) {
wErr.Err = writeErr.Err
for _, idx := range writeErr.MetricsAccept {
wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket][idx])
}
for _, idx := range writeErr.MetricsReject {
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket][idx])
}
if !errors.Is(writeErr.Err, internal.ErrSizeLimitReached) {
continue
}
return &wErr
}

// Return the error without special treatment
wErr.Err = err
return &wErr
}
return nil
}
Expand All @@ -222,11 +240,16 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr
}

func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
// Serialize the metrics
body, err := c.serializer.SerializeBatch(metrics)
if err != nil {
return err
// Get the current limit for the outbound data
ratets := time.Now()
limit := c.rateLimiter.Remaining(ratets)

// Serialize the metrics with the remaining limit, exit early if nothing was serialized
body, werr := c.serializer.SerializeBatch(metrics, limit)
if werr != nil && !errors.Is(werr, internal.ErrSizeLimitReached) || len(body) == 0 {
return werr
}
used := int64(len(body))

// Encode the content if requested
if c.encoder != nil {
Expand All @@ -249,6 +272,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
c.addHeaders(req)

// Execute the request
c.rateLimiter.Accept(ratets, used)
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
internal.OnClientError(c.client, err)
Expand All @@ -269,7 +293,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
http.StatusMultiStatus,
http.StatusAlreadyReported:
c.retryCount = 0
return nil
return werr
}

// We got an error and now try to decode further
Expand All @@ -294,11 +318,18 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
http.StatusBadRequest,
// request was received but server refused to process it due to a semantic problem with the request.
// for example, submitting metrics outside the retention period.
// Clients should *not* repeat the request and the metrics should be dropped.
http.StatusUnprocessableEntity,
http.StatusNotAcceptable:
c.log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc)
return nil

// Clients should *not* repeat the request and the metrics should be dropped.
rejected := make([]int, 0, len(metrics))
for i := range len(metrics) {
rejected = append(rejected, i)
}
return &internal.PartialWriteError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
MetricsReject: rejected,
}
case http.StatusUnauthorized, http.StatusForbidden:
return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc)
case http.StatusTooManyRequests,
Expand All @@ -316,8 +347,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
// retrying will not make the request magically work.
if len(resp.Status) > 0 && resp.Status[0] == '4' {
c.log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc)
return nil
rejected := make([]int, 0, len(metrics))
for i := range len(metrics) {
rejected = append(rejected, i)
}
return &internal.PartialWriteError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
MetricsReject: rejected,
}
}

// This is only until platform spec is fully implemented. As of the
Expand Down
22 changes: 17 additions & 5 deletions plugins/outputs/influxdb_v2/influxdb_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
commontls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/influx"
Expand Down Expand Up @@ -44,10 +45,11 @@ type InfluxDB struct {
ReadIdleTimeout config.Duration `toml:"read_idle_timeout"`
Log telegraf.Logger `toml:"-"`
commontls.ClientConfig
ratelimiter.RateLimitConfig

clients []*httpClient
encoder internal.ContentEncoder
serializer *influx.Serializer
serializer ratelimiter.Serializer
tlsCfg *tls.Config
}

Expand All @@ -65,7 +67,7 @@ func (i *InfluxDB) Init() error {
i.URLs = append(i.URLs, "http://localhost:8086")
}

// Check options
// Init encoding if configured
switch i.ContentEncoding {
case "", "gzip":
i.ContentEncoding = "gzip"
Expand All @@ -80,13 +82,14 @@ func (i *InfluxDB) Init() error {
}

// Setup the limited serializer
i.serializer = &influx.Serializer{
serializer := &influx.Serializer{
UintSupport: i.UintSupport,
OmitTimestamp: i.OmitTimestamp,
}
if err := i.serializer.Init(); err != nil {
if err := serializer.Init(); err != nil {
return fmt.Errorf("setting up serializer failed: %w", err)
}
i.serializer = ratelimiter.NewIndividualSerializer(serializer)

// Setup the client config
tlsCfg, err := i.ClientConfig.TLSConfig()
Expand Down Expand Up @@ -142,6 +145,10 @@ func (i *InfluxDB) Connect() error {

switch parts.Scheme {
case "http", "https", "unix":
limiter, err := i.RateLimitConfig.CreateRateLimiter()
if err != nil {
return err
}
c := &httpClient{
url: parts,
localAddr: localAddr,
Expand All @@ -158,8 +165,9 @@ func (i *InfluxDB) Connect() error {
tlsConfig: i.tlsCfg,
pingTimeout: i.PingTimeout,
readIdleTimeout: i.ReadIdleTimeout,
serializer: i.serializer,
encoder: i.encoder,
rateLimiter: limiter,
serializer: i.serializer,
log: i.Log,
}

Expand Down Expand Up @@ -191,6 +199,10 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
for _, n := range rand.Perm(len(i.clients)) {
client := i.clients[n]
if err := client.Write(ctx, metrics); err != nil {
var werr *internal.PartialWriteError
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
return err
}
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
continue
}
Expand Down
Loading

0 comments on commit 1170985

Please sign in to comment.