Skip to content

Commit

Permalink
Collection interval random jittering
Browse files Browse the repository at this point in the history
closes #460
  • Loading branch information
sparrc committed Jan 19, 2016
1 parent f3b5537 commit 1d839f4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
19 changes: 15 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package telegraf

import (
"crypto/rand"
cryptorand "crypto/rand"
"fmt"
"log"
"math/big"
"math/rand"
"os"
"sync"
"time"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

start := time.Now()
counter := 0
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for _, input := range a.Config.Inputs {
if input.Config.Interval != 0 {
continue
Expand All @@ -104,9 +106,19 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug)
// acc.SetPrefix(input.Name + "_")
acc.SetDefaultTags(a.Config.Tags)

if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}
Expand Down Expand Up @@ -143,7 +155,6 @@ func (a *Agent) gatherSeparate(

acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug)
// acc.SetPrefix(input.Name + "_")
acc.SetDefaultTags(a.Config.Tags)

if err := input.Input.Gather(acc); err != nil {
Expand Down Expand Up @@ -315,7 +326,7 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration {
outinterval := ininterval
if injitter.Nanoseconds() != 0 {
maxjitter := big.NewInt(injitter.Nanoseconds())
if j, err := rand.Int(rand.Reader, maxjitter); err == nil {
if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil {
jitter = j.Int64()
}
outinterval = time.Duration(jitter + ininterval.Nanoseconds())
Expand Down
16 changes: 15 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,22 @@ type AgentConfig struct {
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration

// Interval at which to flush data
FlushInterval internal.Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// FlushJitter tells
// FlushJitter Jitters the flush interval by a random amount.
// This is primarily to avoid large write spikes for users running a large
// number of telegraf instances.
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration

// TODO(cam): Remove UTC and Precision parameters, they are no longer
Expand Down Expand Up @@ -271,6 +280,11 @@ var header = `# Telegraf configuration
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Collection jitter is used to jitter the collection by a random amount.
# Each plugin will sleep for a random time within jitter before collecting.
# This can be used to avoid many plugins querying things like sysfs at the
# same time, which can have a measurable effect on the system.
collection_jitter = "0s"
# Default data flushing interval for all outputs. You should not set this below
# interval. Maximum flush_interval will be flush_interval + flush_jitter
Expand Down

0 comments on commit 1d839f4

Please sign in to comment.