Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs): Add framework to retry on startup errors #14884

Merged
merged 13 commits into from
Mar 26, 2024
25 changes: 14 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,17 @@ func (a *Agent) startOutputs(
src := make(chan telegraf.Metric, 100)
unit := &outputUnit{src: src}
for _, output := range outputs {
err := a.connectOutput(ctx, output)
if err != nil {
for _, output := range unit.outputs {
if err := a.connectOutput(ctx, output); err != nil {
var fatalErr *internal.FatalError
if errors.As(err, &fatalErr) {
// If the model tells us to remove the plugin we do so without error
log.Printf("I! [agent] Failed to connect to [%s], error was %q; shutting down plugin...", output.LogName(), err)
output.Close()
continue
}

for _, unitOutput := range unit.outputs {
unitOutput.Close()
}
return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err)
}
Expand All @@ -810,18 +817,14 @@ func (a *Agent) startOutputs(
// connectOutputs connects to all outputs.
func (a *Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error {
log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
err := output.Output.Connect()
if err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, "+
"error was %q", output.LogName(), err)
if err := output.Connect(); err != nil {
log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, error was %q", output.LogName(), err)

err := internal.SleepContext(ctx, 15*time.Second)
if err != nil {
if err := internal.SleepContext(ctx, 15*time.Second); err != nil {
return err
}

err = output.Output.Connect()
if err != nil {
if err = output.Connect(); err != nil {
return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,7 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldString(tbl, "name_override", &oc.NameOverride)
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)

if c.hasErrs() {
return nil, c.firstErr()
Expand All @@ -1505,7 +1506,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"name_override", "name_prefix", "name_suffix", "namedrop", "namedrop_separator", "namepass", "namepass_separator",
"order",
"pass", "period", "precision",
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags":
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags", "startup_error_behavior":

// Secret-store options to ignore
case "id":
Expand Down
39 changes: 39 additions & 0 deletions internal/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package internal

import "errors"

var ErrNotConnected = errors.New("not connected")

// StartupError indicates an error that occurred during startup of a plugin
// e.g. due to connectivity issues or resources being not yet available.
// In case the 'Retry' flag is set, the startup of the plugin might be retried
// depending on the configured startup-error-behavior. The 'RemovePlugin'
// flag denotes if the agent should remove the plugin from further processing.
type StartupError struct {
Err error
Retry bool
Partial bool
}

func (e *StartupError) Error() string {
return e.Err.Error()
}

func (e *StartupError) Unwrap() error {
return e.Err
}

// FatalError indicates a not-recoverable error in the plugin. The corresponding
// plugin should be remove by the agent stopping any further processing for that
// plugin instance.
type FatalError struct {
Err error
}

func (e *FatalError) Error() string {
return e.Err.Error()
}

func (e *FatalError) Unwrap() error {
return e.Err
}
107 changes: 91 additions & 16 deletions models/running_output.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package models

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
)

Expand All @@ -19,10 +22,11 @@ const (

// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Alias string
ID string
Filter Filter
Name string
Alias string
ID string
StartupErrorBehavior string
Filter Filter

FlushInterval time.Duration
FlushJitter time.Duration
Expand All @@ -47,12 +51,16 @@ type RunningOutput struct {

MetricsFiltered selfstat.Stat
WriteTime selfstat.Stat
StartupErrors selfstat.Stat

BatchReady chan time.Time

buffer *Buffer
log telegraf.Logger

started bool
retries uint64

aggMutex sync.Mutex
}

Expand Down Expand Up @@ -104,6 +112,11 @@ func NewRunningOutput(
"write_time_ns",
tags,
),
StartupErrors: selfstat.Register(
"write",
"startup_errors",
tags,
),
log: logger,
}

Expand All @@ -119,7 +132,20 @@ func (r *RunningOutput) metricFiltered(metric telegraf.Metric) {
metric.Drop()
}

func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
}
return r.Config.ID
}

func (r *RunningOutput) Init() error {
switch r.Config.StartupErrorBehavior {
case "", "error", "retry", "ignore":
default:
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}

if p, ok := r.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
Expand All @@ -129,11 +155,41 @@ func (r *RunningOutput) Init() error {
return nil
}

func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
func (r *RunningOutput) Connect() error {
// Try to connect and exit early on success
err := r.Output.Connect()
if err == nil {
r.started = true
return nil
}
r.StartupErrors.Incr(1)

// Check if the plugin reports a retry-able error, otherwise we exit.
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry {
return err
}

// Handle the retry-able error depending on the configured behavior
switch r.Config.StartupErrorBehavior {
case "", "error": // fall-trough to return the actual error
case "retry":
r.log.Infof("Connect failed: %v; retrying...", err)
return nil
case "ignore":
return &internal.FatalError{Err: serr}
default:
r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}

return err
}

// Close closes the output
func (r *RunningOutput) Close() {
if err := r.Output.Close(); err != nil {
r.log.Errorf("Error closing output: %v", err)
}
return r.Config.ID
}

// AddMetric adds a metric to the output.
Expand Down Expand Up @@ -188,6 +244,22 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (r *RunningOutput) Write() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry || !serr.Partial {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.log.Debugf("Partially connected after %d attempts", r.retries)
} else {
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
}

if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
metrics := output.Push()
Expand Down Expand Up @@ -220,6 +292,17 @@ func (r *RunningOutput) Write() error {

// WriteBatch writes a single batch of metrics to the output.
func (r *RunningOutput) WriteBatch() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}

batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
return nil
Expand All @@ -235,14 +318,6 @@ func (r *RunningOutput) WriteBatch() error {
return nil
}

// Close closes the output
func (r *RunningOutput) Close() {
err := r.Output.Close()
if err != nil {
r.log.Errorf("Error closing output: %v", err)
}
}

func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&r.droppedMetrics)
if dropped > 0 {
Expand Down
Loading
Loading