Skip to content

Commit

Permalink
Add Accumulator to the ServiceInput Start() function
Browse files Browse the repository at this point in the history
closes #666
  • Loading branch information
sparrc committed Feb 16, 2016
1 parent 7f539c9 commit 93106be
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 15 deletions.
7 changes: 5 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating points
metricC := make(chan telegraf.Metric, 1000)
metricC := make(chan telegraf.Metric, 10000)

// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
Expand All @@ -342,7 +342,10 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
if err := p.Start(); err != nil {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
Expand Down
2 changes: 1 addition & 1 deletion input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ServiceInput interface {
Gather(Accumulator) error

// Start starts the ServiceInput's service, whatever that may be
Start() error
Start(Accumulator) error

// Stop stops the services and closes any necessary channels and connections
Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/github_webhooks/github_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() {
}
}

func (gh *GithubWebhooks) Start() error {
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
k.parser = parser
}

func (k *Kafka) Start() error {
func (k *Kafka) Start(_ telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
var consumerErr error
Expand Down
12 changes: 6 additions & 6 deletions plugins/inputs/kafka_consumer/kafka_consumer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ func TestReadsMetricsFromKafka(t *testing.T) {
}
p, _ := parsers.NewInfluxParser()
k.SetParser(p)
if err := k.Start(); err != nil {

// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
if err := k.Start(&acc); err != nil {
t.Fatal(err.Error())
} else {
defer k.Stop()
}

waitForPoint(k, t)

// Verify that we can now gather the sent message
var acc testutil.Accumulator
// Sanity check
assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")

// Gather points
err = k.Gather(&acc)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}

func (m *MQTTConsumer) Start() error {
func (m *MQTTConsumer) Start(_ telegraf.Accumulator) error {
m.Lock()
defer m.Unlock()
if m.QoS > 2 || m.QoS < 0 {
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var sampleConfig = `
queue_group = "telegraf_consumers"
### Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
### Data format to consume. This can be "json", "influx" or "graphite"
### Each data format has it's own unique set of configuration options, read
### more about them here:
Expand Down Expand Up @@ -84,7 +84,7 @@ func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
}

// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
func (n *natsConsumer) Start() error {
func (n *natsConsumer) Start(_ telegraf.Accumulator) error {
n.Lock()
defer n.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
return nil
}

func (s *Statsd) Start() error {
func (s *Statsd) Start(_ telegraf.Accumulator) error {
// Make data structures
s.done = make(chan struct{})
s.in = make(chan []byte, s.AllowedPendingMessages)
Expand Down

0 comments on commit 93106be

Please sign in to comment.