Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.influxdb_v2_listener): Add support for rate limiting #15361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/influxdb_v2_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## (Double check the port. Could be 9999 if using OSS Beta)
service_address = ":8086"

## Maximum undelivered metrics before rate limit kicks in.
## When the rate limit kicks in, HTTP status 429 will be returned.
## 0 disables rate limiting
# max_undelivered_metrics = 0

## Maximum duration before timing out read of the request
# read_timeout = "10s"
## Maximum duration before timing out write of the response
Expand Down
87 changes: 77 additions & 10 deletions plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -52,19 +54,27 @@ type InfluxDBV2Listener struct {
port int
tlsint.ServerConfig

ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`
MaxUndeliveredMetrics int `toml:"max_undelivered_metrics"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`

ctx context.Context
cancel context.CancelFunc
trackingMetricCount map[telegraf.TrackingID]int64
countLock sync.Mutex
totalUndeliveredMetrics atomic.Int64

timeFunc influx.TimeFunc

listener net.Listener
server http.Server

acc telegraf.Accumulator
acc telegraf.Accumulator
trackingAcc telegraf.TrackingAccumulator

bytesRecv selfstat.Stat
requestsServed selfstat.Stat
Expand Down Expand Up @@ -135,6 +145,26 @@ func (h *InfluxDBV2Listener) Init() error {
// Start starts the InfluxDB listener service.
func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
h.acc = acc
h.ctx, h.cancel = context.WithCancel(context.Background())
if h.MaxUndeliveredMetrics > 0 {
h.trackingAcc = h.acc.WithTracking(h.MaxUndeliveredMetrics)
h.trackingMetricCount = make(map[telegraf.TrackingID]int64, h.MaxUndeliveredMetrics)
go func() {
for {
select {
case <-h.ctx.Done():
return
case info := <-h.trackingAcc.Delivered():
if count, ok := h.trackingMetricCount[info.ID()]; ok {
h.countLock.Lock()
h.totalUndeliveredMetrics.Add(-count)
delete(h.trackingMetricCount, info.ID())
h.countLock.Unlock()
}
}
}
}()
}

tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
Expand Down Expand Up @@ -180,6 +210,7 @@ func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {

// Stop cleans up all resources
func (h *InfluxDBV2Listener) Stop() {
h.cancel()
err := h.server.Shutdown(context.Background())
if err != nil {
h.Log.Infof("Error shutting down HTTP server: %v", err.Error())
Expand Down Expand Up @@ -219,6 +250,7 @@ func (h *InfluxDBV2Listener) handleDefault() http.HandlerFunc {
func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1)

// Check that the content length is not too large for us to handle.
if req.ContentLength > int64(h.MaxBodySize) {
if err := tooLarge(res, int64(h.MaxBodySize)); err != nil {
Expand Down Expand Up @@ -308,13 +340,48 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
if h.BucketTag != "" && bucket != "" {
m.AddTag(h.BucketTag, bucket)
}
}

h.acc.AddMetric(m)
if h.MaxUndeliveredMetrics > 0 {
h.writeWithTracking(res, metrics)
} else {
h.write(res, metrics)
}
}
}

func (h *InfluxDBV2Listener) writeWithTracking(res http.ResponseWriter, metrics []telegraf.Metric) {
if len(metrics) > h.MaxUndeliveredMetrics {
res.WriteHeader(http.StatusRequestEntityTooLarge)
h.Log.Debugf("status %d, always rejecting batch of %d metrics: larger than max_undelivered_metrics %d",
http.StatusRequestEntityTooLarge, len(metrics), h.MaxUndeliveredMetrics)
return
}

// http request success
res.WriteHeader(http.StatusNoContent)
pending := h.totalUndeliveredMetrics.Load()
remainingUndeliveredMetrics := int64(h.MaxUndeliveredMetrics) - pending
if int64(len(metrics)) > remainingUndeliveredMetrics {
res.WriteHeader(http.StatusTooManyRequests)
h.Log.Debugf("status %d, rejecting batch of %d metrics: larger than remaining undelivered metrics %d",
http.StatusTooManyRequests, len(metrics), remainingUndeliveredMetrics)
return
}

h.countLock.Lock()
trackingID := h.trackingAcc.AddTrackingMetricGroup(metrics)
h.trackingMetricCount[trackingID] = int64(len(metrics))
h.totalUndeliveredMetrics.Add(int64(len(metrics)))
h.countLock.Unlock()

res.WriteHeader(http.StatusNoContent)
}

func (h *InfluxDBV2Listener) write(res http.ResponseWriter, metrics []telegraf.Metric) {
for _, m := range metrics {
h.acc.AddMetric(m)
}

res.WriteHeader(http.StatusNoContent)
}

func tooLarge(res http.ResponseWriter, maxLength int64) error {
Expand Down
66 changes: 66 additions & 0 deletions plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func newTestAuthListener() *InfluxDBV2Listener {
return listener
}

func newRateLimitedTestListener(maxUndeliveredMetrics int) *InfluxDBV2Listener {
listener := newTestListener()
listener.MaxUndeliveredMetrics = maxUndeliveredMetrics
return listener
}

func newTestSecureListener() *InfluxDBV2Listener {
listener := &InfluxDBV2Listener{
Log: testutil.Logger{},
Expand Down Expand Up @@ -599,4 +605,64 @@ func TestWriteWithPrecisionNoTimestamp(t *testing.T) {
require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time)
}

func TestRateLimitedConnectionDropsSecondRequest(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

msg := "xyzzy value=42\n"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

resp, err = http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 429, resp.StatusCode)
}

func TestRateLimitedConnectionAcceptsNewRequestOnDelivery(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

msg := "xyzzy value=42\n"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

ms := acc.GetTelegrafMetrics()
for _, m := range ms {
m.Accept()
}

resp, err = http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func TestRateLimitedConnectionRejectsBatchesLargerThanMaxUndeliveredMetrics(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

msg := "xyzzy value=42\nxyzzy value=43"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode)
}

// The term 'master_repl' used here is archaic language from redis
5 changes: 5 additions & 0 deletions plugins/inputs/influxdb_v2_listener/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
## (Double check the port. Could be 9999 if using OSS Beta)
service_address = ":8086"

## Maximum undelivered metrics before rate limit kicks in.
## When the rate limit kicks in, HTTP status 429 will be returned.
## 0 disables rate limiting
# max_undelivered_metrics = 0

## Maximum duration before timing out read of the request
# read_timeout = "10s"
## Maximum duration before timing out write of the response
Expand Down
Loading