Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add call to optional Init function for all plugins #5899

Merged
merged 2 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ func (a *Agent) Run(ctx context.Context) error {
return ctx.Err()
}

log.Printf("D! [agent] Initializing plugins")
err := a.initPlugins()
if err != nil {
return err
}

log.Printf("D! [agent] Connecting outputs")
err := a.connectOutputs(ctx)
err = a.connectOutputs(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,6 +175,11 @@ func (a *Agent) Test(ctx context.Context) error {
}()

for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return err
}

select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -578,6 +589,39 @@ func (a *Agent) flushOnce(

}

// initPlugins runs the Init function on plugins.
func (a *Agent) initPlugins() error {
for _, input := range a.Config.Inputs {
err := input.Init()
if err != nil {
return fmt.Errorf("could not initialize input %s: %v",
input.Config.Name, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
return fmt.Errorf("could not initialize processor %s: %v",
processor.Config.Name, err)
}
}
for _, aggregator := range a.Config.Aggregators {
err := aggregator.Init()
if err != nil {
return fmt.Errorf("could not initialize aggregator %s: %v",
aggregator.Config.Name, err)
}
}
for _, output := range a.Config.Outputs {
err := output.Init()
if err != nil {
return fmt.Errorf("could not initialize output %s: %v",
output.Config.Name, err)
}
}
return nil
}

// connectOutputs connects to all outputs.
func (a *Agent) connectOutputs(ctx context.Context) error {
for _, output := range a.Config.Outputs {
Expand Down
4 changes: 4 additions & 0 deletions docs/AGGREGATORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var sampleConfig = `
drop_original = false
`

func (m *Min) Init() error {
return nil
}

func (m *Min) SampleConfig() string {
return sampleConfig
}
Expand Down
4 changes: 4 additions & 0 deletions docs/INPUTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (s *Simple) SampleConfig() string {
`
}

func (s *Simple) Init() error {
return nil
}

func (s *Simple) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("state", map[string]interface{}{"value": "pretty good"}, nil)
Expand Down
4 changes: 4 additions & 0 deletions docs/OUTPUTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (s *Simple) SampleConfig() string {
`
}

func (s *Simple) Init() error {
return nil
}

func (s *Simple) Connect() error {
// Make a connection to the URL here
return nil
Expand Down
4 changes: 4 additions & 0 deletions docs/PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (p *Printer) Description() string {
return "Print all metrics that pass through this filter."
}

func (p *Printer) Init() error {
return nil
}

func (p *Printer) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
fmt.Println(metric.String())
Expand Down
9 changes: 9 additions & 0 deletions input.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package telegraf

// Initializer is an interface that all plugin types: Inputs, Outputs,
// Processors, and Aggregators can optionally implement to initialize the
// plugin.
type Initializer interface {
// Init performs one time setup of the plugin and returns an error if the
// configuration is invalid.
Init() error
}

type Input interface {
// SampleConfig returns the default configuration of the Input
SampleConfig() string
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (r *RunningAggregator) Name() string {
return "aggregators." + r.Config.Name
}

func (r *RunningAggregator) Init() error {
if p, ok := r.Aggregator.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

func (r *RunningAggregator) Period() time.Duration {
return r.Config.Period
}
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ func (r *RunningInput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (r *RunningInput) Init() error {
if p, ok := r.Input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
if ok := r.Config.Filter.Select(metric); !ok {
r.metricFiltered(metric)
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ func (ro *RunningOutput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (ro *RunningOutput) Init() error {
if p, ok := ro.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

// AddMetric adds a metric to the output.
//
// Takes ownership of metric
Expand Down
10 changes: 10 additions & 0 deletions internal/models/running_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ func containsMetric(item telegraf.Metric, metrics []telegraf.Metric) bool {
return false
}

func (rp *RunningProcessor) Init() error {
if p, ok := rp.Processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}

func (rp *RunningProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
rp.Lock()
defer rp.Unlock()
Expand Down
33 changes: 15 additions & 18 deletions plugins/inputs/http/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package http

import (
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -89,27 +88,25 @@ func (*HTTP) Description() string {
return "Read formatted metrics from one or more HTTP endpoints"
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
if h.parser == nil {
return errors.New("Parser is not set")
func (h *HTTP) Init() error {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return err
}

if h.client == nil {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return err
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: h.Timeout.Duration,
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: h.Timeout.Duration,
}
return nil
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
for _, u := range h.URLs {
wg.Add(1)
Expand Down
24 changes: 5 additions & 19 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestHTTPwithJSONFormat(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 1)
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestHTTPHeaders(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather))
}

Expand All @@ -100,6 +102,7 @@ func TestInvalidStatusCode(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.Error(t, acc.GatherError(plugin.Gather))
}

Expand All @@ -125,28 +128,10 @@ func TestMethod(t *testing.T) {
plugin.SetParser(p)

var acc testutil.Accumulator
plugin.Init()
require.NoError(t, acc.GatherError(plugin.Gather))
}

func TestParserNotSet(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(simpleJSON))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()

url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
}

var acc testutil.Accumulator
require.Error(t, acc.GatherError(plugin.Gather))
}

const simpleJSON = `
{
"a": 1.2
Expand Down Expand Up @@ -237,6 +222,7 @@ func TestBodyAndContentEncoding(t *testing.T) {
tt.plugin.SetParser(parser)

var acc testutil.Accumulator
tt.plugin.Init()
err = tt.plugin.Gather(&acc)
require.NoError(t, err)
})
Expand Down