Skip to content

Commit

Permalink
Put Agent Config into the config package
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Nov 30, 2015
1 parent 979e5f1 commit a5f2d5f
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 121 deletions.
98 changes: 24 additions & 74 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/internal/config"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
Expand All @@ -19,77 +18,27 @@ import (

// Agent runs telegraf and collects data based on the given config
type Agent struct {

// Interval at which to gather information
Interval internal.Duration

// RoundInterval rounds collection interval to 'interval'.
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// Interval at which to flush data
FlushInterval internal.Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// FlushJitter tells
FlushJitter internal.Duration

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability

// Option for outputting data in UTC
UTC bool `toml:"utc"`

// Precision to write data at
// Valid values for Precision are n, u, ms, s, m, and h
Precision string

// Option for running in debug mode
Debug bool
Hostname string

Tags map[string]string

Config *config.Config
}

// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *config.Config) (*Agent, error) {
agent := &Agent{
Tags: make(map[string]string),
Config: config,
Interval: internal.Duration{10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{10 * time.Second},
FlushRetries: 2,
FlushJitter: internal.Duration{5 * time.Second},
a := &Agent{
Config: config,
}

// Apply the toml table to the agent config, overriding defaults
err := config.ApplyAgent(agent)
if err != nil {
return nil, err
}

if agent.Hostname == "" {
if a.Config.Agent.Hostname == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

agent.Hostname = hostname
}

if config.Tags == nil {
config.Tags = map[string]string{}
a.Config.Agent.Hostname = hostname
}

config.Tags["host"] = agent.Hostname
config.Tags["host"] = a.Config.Agent.Hostname

return agent, nil
return a, nil
}

// Connect connects to all configured outputs
Expand All @@ -104,7 +53,7 @@ func (a *Agent) Connect() error {
}
}

if a.Debug {
if a.Config.Agent.Debug {
log.Printf("Attempting connection to output: %s\n", o.Name)
}
err := o.Output.Connect()
Expand All @@ -116,7 +65,7 @@ func (a *Agent) Connect() error {
return err
}
}
if a.Debug {
if a.Config.Agent.Debug {
log.Printf("Successfully connected to output: %s\n", o.Name)
}
}
Expand Down Expand Up @@ -154,9 +103,9 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
defer wg.Done()

acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Debug)
acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Tags)
acc.SetDefaultTags(a.Config.Tags)

if err := plugin.Plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.Name, err)
Expand All @@ -169,7 +118,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

elapsed := time.Since(start)
log.Printf("Gathered metrics, (%s interval), from %d plugins in %s\n",
a.Interval, counter, elapsed)
a.Config.Agent.Interval, counter, elapsed)
return nil
}

Expand All @@ -187,9 +136,9 @@ func (a *Agent) gatherSeparate(
start := time.Now()

acc := NewAccumulator(plugin.Config, pointChan)
acc.SetDebug(a.Debug)
acc.SetDebug(a.Config.Agent.Debug)
acc.SetPrefix(plugin.Name + "_")
acc.SetDefaultTags(a.Tags)
acc.SetDefaultTags(a.Config.Tags)

if err := plugin.Plugin.Gather(acc); err != nil {
log.Printf("Error in plugin [%s]: %s", plugin.Name, err)
Expand Down Expand Up @@ -273,7 +222,7 @@ func (a *Agent) writeOutput(
return
}
retry := 0
retries := a.FlushRetries
retries := a.Config.Agent.FlushRetries
start := time.Now()

for {
Expand All @@ -299,8 +248,8 @@ func (a *Agent) writeOutput(
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.Name, err.Error(), a.FlushInterval.Duration)
time.Sleep(a.FlushInterval.Duration)
ro.Name, err.Error(), a.Config.Agent.FlushInterval.Duration)
time.Sleep(a.Config.Agent.FlushInterval.Duration)
}
}

Expand Down Expand Up @@ -330,7 +279,7 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
// the flusher will flush after metrics are collected.
time.Sleep(time.Millisecond * 100)

ticker := time.NewTicker(a.FlushInterval.Duration)
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
points := make([]*client.Point, 0)

for {
Expand Down Expand Up @@ -373,22 +322,23 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration {
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup

a.FlushInterval.Duration = jitterInterval(a.FlushInterval.Duration,
a.FlushJitter.Duration)
a.Config.Agent.FlushInterval.Duration = jitterInterval(a.Config.Agent.FlushInterval.Duration,
a.Config.Agent.FlushJitter.Duration)

log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, "+
"Flush Interval:%s\n",
a.Interval, a.Debug, a.Hostname, a.FlushInterval)
a.Config.Agent.Interval, a.Config.Agent.Debug,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval)

// channel shared between all plugin threads for accumulating points
pointChan := make(chan *client.Point, 1000)

// Round collection to nearest interval by sleeping
if a.RoundInterval {
i := int64(a.Interval.Duration)
if a.Config.Agent.RoundInterval {
i := int64(a.Config.Agent.Interval.Duration)
time.Sleep(time.Duration(i - (time.Now().UnixNano() % i)))
}
ticker := time.NewTicker(a.Interval.Duration)
ticker := time.NewTicker(a.Config.Agent.Interval.Duration)

wg.Add(1)
go func() {
Expand Down
43 changes: 10 additions & 33 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/influxdb/telegraf/internal"
"github.com/influxdb/telegraf/internal/config"

// needing to load the plugins
Expand Down Expand Up @@ -85,12 +84,8 @@ func TestAgent_LoadOutput(t *testing.T) {
}

func TestAgent_ZeroJitter(t *testing.T) {
a := &Agent{
FlushInterval: internal.Duration{10 * time.Second},
FlushJitter: internal.Duration{0 * time.Second},
}
flushinterval := jitterInterval(a.FlushInterval.Duration,
a.FlushJitter.Duration)
flushinterval := jitterInterval(time.Duration(10*time.Second),
time.Duration(0*time.Second))

actual := flushinterval.Nanoseconds()
exp := time.Duration(10 * time.Second).Nanoseconds()
Expand All @@ -105,13 +100,8 @@ func TestAgent_ZeroInterval(t *testing.T) {
max := time.Duration(5 * time.Second).Nanoseconds()

for i := 0; i < 1000; i++ {
a := &Agent{
FlushInterval: internal.Duration{0 * time.Second},
FlushJitter: internal.Duration{5 * time.Second},
}

flushinterval := jitterInterval(a.FlushInterval.Duration,
a.FlushJitter.Duration)
flushinterval := jitterInterval(time.Duration(0*time.Second),
time.Duration(5*time.Second))
actual := flushinterval.Nanoseconds()

if actual > max {
Expand All @@ -126,13 +116,8 @@ func TestAgent_ZeroInterval(t *testing.T) {
}

func TestAgent_ZeroBoth(t *testing.T) {
a := &Agent{
FlushInterval: internal.Duration{0 * time.Second},
FlushJitter: internal.Duration{0 * time.Second},
}

flushinterval := jitterInterval(a.FlushInterval.Duration,
a.FlushJitter.Duration)
flushinterval := jitterInterval(time.Duration(0*time.Second),
time.Duration(0*time.Second))

actual := flushinterval
exp := time.Duration(500 * time.Millisecond)
Expand All @@ -146,12 +131,8 @@ func TestAgent_JitterMax(t *testing.T) {
max := time.Duration(32 * time.Second).Nanoseconds()

for i := 0; i < 1000; i++ {
a := &Agent{
FlushInterval: internal.Duration{30 * time.Second},
FlushJitter: internal.Duration{2 * time.Second},
}
flushinterval := jitterInterval(a.FlushInterval.Duration,
a.FlushJitter.Duration)
flushinterval := jitterInterval(time.Duration(30*time.Second),
time.Duration(2*time.Second))
actual := flushinterval.Nanoseconds()
if actual > max {
t.Errorf("Didn't expect interval %d to be > %d", actual, max)
Expand All @@ -164,12 +145,8 @@ func TestAgent_JitterMin(t *testing.T) {
min := time.Duration(30 * time.Second).Nanoseconds()

for i := 0; i < 1000; i++ {
a := &Agent{
FlushInterval: internal.Duration{30 * time.Second},
FlushJitter: internal.Duration{2 * time.Second},
}
flushinterval := jitterInterval(a.FlushInterval.Duration,
a.FlushJitter.Duration)
flushinterval := jitterInterval(time.Duration(30*time.Second),
time.Duration(2*time.Second))
actual := flushinterval.Nanoseconds()
if actual < min {
t.Errorf("Didn't expect interval %d to be < %d", actual, min)
Expand Down
2 changes: 1 addition & 1 deletion cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
}

if *fDebug {
ag.Debug = true
ag.Config.Agent.Debug = true
}

if *fTest {
Expand Down
Loading

0 comments on commit a5f2d5f

Please sign in to comment.