Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close running outputs when reloading #8769

Merged
merged 10 commits into from
Mar 16, 2021
41 changes: 26 additions & 15 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
)

var stop chan struct{}
var ag *agent.Agent
viperstars marked this conversation as resolved.
Show resolved Hide resolved

func reloadLoop(
inputFilters []string,
Expand Down Expand Up @@ -106,8 +107,16 @@ func reloadLoop(
cancel()
}
}()

err := runAgent(ctx, inputFilters, outputFilters)
if ag != nil {
viperstars marked this conversation as resolved.
Show resolved Hide resolved
if ag.Config != nil {
for idx := range ag.Config.Outputs {
log.Printf("I! Closing running input: %s", ag.Config.Outputs[idx].LogName())
viperstars marked this conversation as resolved.
Show resolved Hide resolved
ag.Config.Outputs[idx].Close()
}
}
}
var err error
ag, err = runAgent(ctx, inputFilters, outputFilters)
if err != nil && err != context.Canceled {
log.Fatalf("E! [telegraf] Error running agent: %v", err)
}
Expand All @@ -117,44 +126,46 @@ func reloadLoop(
func runAgent(ctx context.Context,
inputFilters []string,
outputFilters []string,
) error {
) (*agent.Agent, error) {
var ag *agent.Agent
viperstars marked this conversation as resolved.
Show resolved Hide resolved
var err error
viperstars marked this conversation as resolved.
Show resolved Hide resolved
log.Printf("I! Starting Telegraf %s", version)

// If no other options are specified, load the config file and run.
c := config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
err := c.LoadConfig(*fConfig)
err = c.LoadConfig(*fConfig)
if err != nil {
return err
return ag, err
}

if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory)
if err != nil {
return err
return ag, err
}
}
if !*fTest && len(c.Outputs) == 0 {
return errors.New("Error: no outputs found, did you provide a valid config file?")
return ag, errors.New("Error: no outputs found, did you provide a valid config file?")
}
if *fPlugins == "" && len(c.Inputs) == 0 {
return errors.New("Error: no inputs found, did you provide a valid config file?")
return ag, errors.New("Error: no inputs found, did you provide a valid config file?")
}

if int64(c.Agent.Interval.Duration) <= 0 {
return fmt.Errorf("Agent interval must be positive, found %s",
return ag, fmt.Errorf("Agent interval must be positive, found %s",
c.Agent.Interval.Duration)
}

if int64(c.Agent.FlushInterval.Duration) <= 0 {
return fmt.Errorf("Agent flush_interval must be positive; found %s",
return ag, fmt.Errorf("Agent flush_interval must be positive; found %s",
c.Agent.Interval.Duration)
}

ag, err := agent.NewAgent(c)
ag, err = agent.NewAgent(c)
if err != nil {
return err
return ag, err
}

// Setup logging as configured.
Expand All @@ -172,12 +183,12 @@ func runAgent(ctx context.Context,

if *fRunOnce {
wait := time.Duration(*fTestWait) * time.Second
return ag.Once(ctx, wait)
return ag, ag.Once(ctx, wait)
}

if *fTest || *fTestWait != 0 {
wait := time.Duration(*fTestWait) * time.Second
return ag.Test(ctx, wait)
return ag, ag.Test(ctx, wait)
}

log.Printf("I! Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand All @@ -204,7 +215,7 @@ func runAgent(ctx context.Context,
}
}

return ag.Run(ctx)
return ag, ag.Run(ctx)
}

func usageExit(rc int) {
Expand Down