diff --git a/agent/agent.go b/agent/agent.go index 66fc140aed92e..97c6b01c8f470 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "os" "runtime" "sync" "time" @@ -516,16 +517,7 @@ func (a *Agent) runOutputs( wg.Add(1) go func(output *models.RunningOutput) { defer wg.Done() - - if a.Config.Agent.RoundInterval { - err := internal.SleepContext( - ctx, internal.AlignDuration(startTime, interval)) - if err != nil { - return - } - } - - a.flush(ctx, output, interval, jitter) + a.flushLoop(ctx, startTime, output, interval, jitter) }(output) } @@ -546,25 +538,39 @@ func (a *Agent) runOutputs( return nil } -// flush runs an output's flush function periodically until the context is +// flushLoop runs an output's flush function periodically until the context is // done. -func (a *Agent) flush( +func (a *Agent) flushLoop( ctx context.Context, + startTime time.Time, output *models.RunningOutput, interval time.Duration, jitter time.Duration, ) { - // since we are watching two channels we need a ticker with the jitter - // integrated. - ticker := NewTicker(interval, jitter) - defer ticker.Stop() - logError := func(err error) { if err != nil { log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err) } } + // watch for flush requests + flushRequested := make(chan os.Signal, 1) + watchForFlushSignal(flushRequested) + + // align to round interval + if a.Config.Agent.RoundInterval { + err := internal.SleepContext( + ctx, internal.AlignDuration(startTime, interval)) + if err != nil { + return + } + } + + // since we are watching two channels we need a ticker with the jitter + // integrated. + ticker := NewTicker(interval, jitter) + defer ticker.Stop() + for { // Favor shutdown over other methods. select { @@ -575,8 +581,13 @@ func (a *Agent) flush( } select { + case <-ctx.Done(): + logError(a.flushOnce(output, interval, output.Write)) + return case <-ticker.C: logError(a.flushOnce(output, interval, output.Write)) + case <-flushRequested: + logError(a.flushOnce(output, interval, output.Write)) case <-output.BatchReady: // Favor the ticker over batch ready select { @@ -585,9 +596,6 @@ func (a *Agent) flush( default: logError(a.flushOnce(output, interval, output.WriteBatch)) } - case <-ctx.Done(): - logError(a.flushOnce(output, interval, output.Write)) - return } } } diff --git a/agent/agent_notwindows.go b/agent/agent_notwindows.go new file mode 100644 index 0000000000000..4d1f496a93a35 --- /dev/null +++ b/agent/agent_notwindows.go @@ -0,0 +1,16 @@ +// +build !windows + +package agent + +import ( + "os" + "os/signal" + "syscall" +) + +const flushSignal = syscall.SIGUSR1 + +func watchForFlushSignal(flushRequested chan os.Signal) { + signal.Notify(flushRequested, flushSignal) + defer signal.Stop(flushRequested) +} diff --git a/agent/agent_windows.go b/agent/agent_windows.go new file mode 100644 index 0000000000000..7e7a0cabdf67f --- /dev/null +++ b/agent/agent_windows.go @@ -0,0 +1,9 @@ +// +build windows + +package agent + +import "os" + +func watchForFlushSignal(flushRequested chan os.Signal) { + // not implemented +} diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 7b013cc6cc72f..c1f7344da1717 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -10,7 +10,6 @@ import ( _ "net/http/pprof" // Comment this line to disable pprof endpoint. "os" "os/signal" - "runtime" "sort" "strings" "syscall" @@ -27,16 +26,16 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" _ "github.com/influxdata/telegraf/plugins/outputs/all" _ "github.com/influxdata/telegraf/plugins/processors/all" - "github.com/kardianos/service" ) +// If you update these, update usage.go and usage_windows.go var fDebug = flag.Bool("debug", false, "turn on debug logging") var pprofAddr = flag.String("pprof-addr", "", "pprof address to listen on, not activate pprof if empty") var fQuiet = flag.Bool("quiet", false, "run in quiet mode") -var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit") +var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit. Note: Test mode only runs inputs, not processors, aggregators, or outputs") var fTestWait = flag.Int("test-wait", 0, "wait up to this many seconds for service inputs to complete in test mode") var fConfig = flag.String("config", "", "configuration file to load") var fConfigDirectory = flag.String("config-directory", "", @@ -78,7 +77,6 @@ var ( var stop chan struct{} func reloadLoop( - stop chan struct{}, inputFilters []string, outputFilters []string, aggregatorFilters []string, @@ -91,7 +89,7 @@ func reloadLoop( ctx, cancel := context.WithCancel(context.Background()) - signals := make(chan os.Signal) + signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) go func() { @@ -208,32 +206,6 @@ func usageExit(rc int) { os.Exit(rc) } -type program struct { - inputFilters []string - outputFilters []string - aggregatorFilters []string - processorFilters []string -} - -func (p *program) Start(s service.Service) error { - go p.run() - return nil -} -func (p *program) run() { - stop = make(chan struct{}) - reloadLoop( - stop, - p.inputFilters, - p.outputFilters, - p.aggregatorFilters, - p.processorFilters, - ) -} -func (p *program) Stop(s service.Service) error { - close(stop) - return nil -} - func formatFullVersion() string { var parts = []string{"Telegraf"} @@ -380,80 +352,10 @@ func main() { log.Println("Telegraf version already configured to: " + internal.Version()) } - if runtime.GOOS == "windows" && windowsRunAsService() { - programFiles := os.Getenv("ProgramFiles") - if programFiles == "" { // Should never happen - programFiles = "C:\\Program Files" - } - svcConfig := &service.Config{ - Name: *fServiceName, - DisplayName: *fServiceDisplayName, - Description: "Collects data using a series of plugins and publishes it to" + - "another series of plugins.", - Arguments: []string{"--config", programFiles + "\\Telegraf\\telegraf.conf"}, - } - - prg := &program{ - inputFilters: inputFilters, - outputFilters: outputFilters, - aggregatorFilters: aggregatorFilters, - processorFilters: processorFilters, - } - s, err := service.New(prg, svcConfig) - if err != nil { - log.Fatal("E! " + err.Error()) - } - // Handle the --service flag here to prevent any issues with tooling that - // may not have an interactive session, e.g. installing from Ansible. - if *fService != "" { - if *fConfig != "" { - svcConfig.Arguments = []string{"--config", *fConfig} - } - if *fConfigDirectory != "" { - svcConfig.Arguments = append(svcConfig.Arguments, "--config-directory", *fConfigDirectory) - } - //set servicename to service cmd line, to have a custom name after relaunch as a service - svcConfig.Arguments = append(svcConfig.Arguments, "--service-name", *fServiceName) - - err := service.Control(s, *fService) - if err != nil { - log.Fatal("E! " + err.Error()) - } - os.Exit(0) - } else { - winlogger, err := s.Logger(nil) - if err == nil { - //When in service mode, register eventlog target andd setup default logging to eventlog - logger.RegisterEventLogger(winlogger) - logger.SetupLogging(logger.LogConfig{LogTarget: logger.LogTargetEventlog}) - } - err = s.Run() - - if err != nil { - log.Println("E! " + err.Error()) - } - } - } else { - stop = make(chan struct{}) - reloadLoop( - stop, - inputFilters, - outputFilters, - aggregatorFilters, - processorFilters, - ) - } -} - -// Return true if Telegraf should create a Windows service. -func windowsRunAsService() bool { - if *fService != "" { - return true - } - - if *fRunAsConsole { - return false - } - - return !service.Interactive() + run( + inputFilters, + outputFilters, + aggregatorFilters, + processorFilters, + ) } diff --git a/cmd/telegraf/telegraf_notwindows.go b/cmd/telegraf/telegraf_notwindows.go new file mode 100644 index 0000000000000..ca28622f16752 --- /dev/null +++ b/cmd/telegraf/telegraf_notwindows.go @@ -0,0 +1,13 @@ +// +build !windows + +package main + +func run(inputFilters, outputFilters, aggregatorFilters, processorFilters []string) { + stop = make(chan struct{}) + reloadLoop( + inputFilters, + outputFilters, + aggregatorFilters, + processorFilters, + ) +} diff --git a/cmd/telegraf/telegraf_windows.go b/cmd/telegraf/telegraf_windows.go new file mode 100644 index 0000000000000..eaf700ed0e52a --- /dev/null +++ b/cmd/telegraf/telegraf_windows.go @@ -0,0 +1,124 @@ +// +build windows + +package main + +import ( + "log" + "os" + "runtime" + + "github.com/influxdata/telegraf/logger" + "github.com/kardianos/service" +) + +func run(inputFilters, outputFilters, aggregatorFilters, processorFilters []string) { + if runtime.GOOS == "windows" && windowsRunAsService() { + runAsWindowsService( + inputFilters, + outputFilters, + aggregatorFilters, + processorFilters, + ) + } else { + stop = make(chan struct{}) + reloadLoop( + inputFilters, + outputFilters, + aggregatorFilters, + processorFilters, + ) + } +} + +type program struct { + inputFilters []string + outputFilters []string + aggregatorFilters []string + processorFilters []string +} + +func (p *program) Start(s service.Service) error { + go p.run() + return nil +} +func (p *program) run() { + stop = make(chan struct{}) + reloadLoop( + p.inputFilters, + p.outputFilters, + p.aggregatorFilters, + p.processorFilters, + ) +} +func (p *program) Stop(s service.Service) error { + close(stop) + return nil +} + +func runAsWindowsService(inputFilters, outputFilters, aggregatorFilters, processorFilters []string) { + programFiles := os.Getenv("ProgramFiles") + if programFiles == "" { // Should never happen + programFiles = "C:\\Program Files" + } + svcConfig := &service.Config{ + Name: *fServiceName, + DisplayName: *fServiceDisplayName, + Description: "Collects data using a series of plugins and publishes it to" + + "another series of plugins.", + Arguments: []string{"--config", programFiles + "\\Telegraf\\telegraf.conf"}, + } + + prg := &program{ + inputFilters: inputFilters, + outputFilters: outputFilters, + aggregatorFilters: aggregatorFilters, + processorFilters: processorFilters, + } + s, err := service.New(prg, svcConfig) + if err != nil { + log.Fatal("E! " + err.Error()) + } + // Handle the --service flag here to prevent any issues with tooling that + // may not have an interactive session, e.g. installing from Ansible. + if *fService != "" { + if *fConfig != "" { + svcConfig.Arguments = []string{"--config", *fConfig} + } + if *fConfigDirectory != "" { + svcConfig.Arguments = append(svcConfig.Arguments, "--config-directory", *fConfigDirectory) + } + //set servicename to service cmd line, to have a custom name after relaunch as a service + svcConfig.Arguments = append(svcConfig.Arguments, "--service-name", *fServiceName) + + err := service.Control(s, *fService) + if err != nil { + log.Fatal("E! " + err.Error()) + } + os.Exit(0) + } else { + winlogger, err := s.Logger(nil) + if err == nil { + //When in service mode, register eventlog target andd setup default logging to eventlog + logger.RegisterEventLogger(winlogger) + logger.SetupLogging(logger.LogConfig{LogTarget: logger.LogTargetEventlog}) + } + err = s.Run() + + if err != nil { + log.Println("E! " + err.Error()) + } + } +} + +// Return true if Telegraf should create a Windows service. +func windowsRunAsService() bool { + if *fService != "" { + return true + } + + if *fRunAsConsole { + return false + } + + return !service.Interactive() +} diff --git a/docs/OUTPUTS.md b/docs/OUTPUTS.md index 9d89491cc39d7..c60cd96ba7539 100644 --- a/docs/OUTPUTS.md +++ b/docs/OUTPUTS.md @@ -94,6 +94,19 @@ You should also add the following to your `SampleConfig()`: data_format = "influx" ``` +## Flushing Metrics to Outputs + +Metrics are flushed to outputs when any of the following events happen: +- `flush_interval + rand(flush_jitter)` has elapsed since start or the last flush interval +- At least `metric_batch_size` count of metrics are waiting in the buffer +- The telegraf process has received a SIGUSR1 signal + +Note that if the flush takes longer than the `agent.interval` to write the metrics +to the output, you'll see a message saying the output `did not complete within its +flush interval`. This may mean your output is not keeping up with the flow of metrics, +and you may want to look into enabling compression, reducing the size of your metrics, +or investigate other reasons why the writes might be taking longer than expected. + [file]: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/file [output data formats]: https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md [SampleConfig]: https://github.com/influxdata/telegraf/wiki/SampleConfig diff --git a/internal/usage.go b/internal/usage.go index 90ea929869f57..1240873434195 100644 --- a/internal/usage.go +++ b/internal/usage.go @@ -32,8 +32,9 @@ The commands & flags are: Valid values are 'agent', 'global_tags', 'outputs', 'processors', 'aggregators' and 'inputs' --sample-config print out full sample configuration - --test gather metrics, print them out, and exit; - processors, aggregators, and outputs are not run + --test enable test mode: gather metrics, print them out, + and exit. Note: Test mode only runs inputs, not + processors, aggregators, or outputs --test-wait wait up to this many seconds for service inputs to complete in test mode --usage print usage for a plugin, ie, 'telegraf --usage mysql' diff --git a/internal/usage_windows.go b/internal/usage_windows.go index af2506ec1e974..3ee2f7eff8813 100644 --- a/internal/usage_windows.go +++ b/internal/usage_windows.go @@ -29,8 +29,9 @@ The commands & flags are: --section-filter filter config sections to output, separator is : Valid values are 'agent', 'global_tags', 'outputs', 'processors', 'aggregators' and 'inputs' - --test gather metrics, print them out, and exit; - processors, aggregators, and outputs are not run + --test enable test mode: gather metrics, print them out, + and exit. Note: Test mode only runs inputs, not + processors, aggregators, or outputs --test-wait wait up to this many seconds for service inputs to complete in test mode --usage print usage for a plugin, ie, 'telegraf --usage mysql'