Skip to content

Commit

Permalink
add support for SIGUSR1 to trigger flush (influxdata#7366)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Apr 20, 2020
1 parent 6c72c64 commit 819481b
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 131 deletions.
48 changes: 28 additions & 20 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"os"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -516,16 +517,7 @@ func (a *Agent) runOutputs(
wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()

if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

a.flush(ctx, output, interval, jitter)
a.flushLoop(ctx, startTime, output, interval, jitter)
}(output)
}

Expand All @@ -546,25 +538,39 @@ func (a *Agent) runOutputs(
return nil
}

// flush runs an output's flush function periodically until the context is
// flushLoop runs an output's flush function periodically until the context is
// done.
func (a *Agent) flush(
func (a *Agent) flushLoop(
ctx context.Context,
startTime time.Time,
output *models.RunningOutput,
interval time.Duration,
jitter time.Duration,
) {
// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()

logError := func(err error) {
if err != nil {
log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err)
}
}

// watch for flush requests
flushRequested := make(chan os.Signal, 1)
watchForFlushSignal(flushRequested)

// align to round interval
if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()

for {
// Favor shutdown over other methods.
select {
Expand All @@ -575,8 +581,13 @@ func (a *Agent) flush(
}

select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
return
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-flushRequested:
logError(a.flushOnce(output, interval, output.Write))
case <-output.BatchReady:
// Favor the ticker over batch ready
select {
Expand All @@ -585,9 +596,6 @@ func (a *Agent) flush(
default:
logError(a.flushOnce(output, interval, output.WriteBatch))
}
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
return
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions agent/agent_notwindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// +build !windows

package agent

import (
"os"
"os/signal"
"syscall"
)

const flushSignal = syscall.SIGUSR1

func watchForFlushSignal(flushRequested chan os.Signal) {
signal.Notify(flushRequested, flushSignal)
defer signal.Stop(flushRequested)
}
9 changes: 9 additions & 0 deletions agent/agent_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build windows

package agent

import "os"

func watchForFlushSignal(flushRequested chan os.Signal) {
// not implemented
}
116 changes: 9 additions & 107 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
_ "net/http/pprof" // Comment this line to disable pprof endpoint.
"os"
"os/signal"
"runtime"
"sort"
"strings"
"syscall"
Expand All @@ -27,16 +26,16 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/kardianos/service"
)

// If you update these, update usage.go and usage_windows.go
var fDebug = flag.Bool("debug", false,
"turn on debug logging")
var pprofAddr = flag.String("pprof-addr", "",
"pprof address to listen on, not activate pprof if empty")
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit")
var fTest = flag.Bool("test", false, "enable test mode: gather metrics, print them out, and exit. Note: Test mode only runs inputs, not processors, aggregators, or outputs")
var fTestWait = flag.Int("test-wait", 0, "wait up to this many seconds for service inputs to complete in test mode")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
Expand Down Expand Up @@ -78,7 +77,6 @@ var (
var stop chan struct{}

func reloadLoop(
stop chan struct{},
inputFilters []string,
outputFilters []string,
aggregatorFilters []string,
Expand All @@ -91,7 +89,7 @@ func reloadLoop(

ctx, cancel := context.WithCancel(context.Background())

signals := make(chan os.Signal)
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP,
syscall.SIGTERM, syscall.SIGINT)
go func() {
Expand Down Expand Up @@ -208,32 +206,6 @@ func usageExit(rc int) {
os.Exit(rc)
}

type program struct {
inputFilters []string
outputFilters []string
aggregatorFilters []string
processorFilters []string
}

func (p *program) Start(s service.Service) error {
go p.run()
return nil
}
func (p *program) run() {
stop = make(chan struct{})
reloadLoop(
stop,
p.inputFilters,
p.outputFilters,
p.aggregatorFilters,
p.processorFilters,
)
}
func (p *program) Stop(s service.Service) error {
close(stop)
return nil
}

func formatFullVersion() string {
var parts = []string{"Telegraf"}

Expand Down Expand Up @@ -380,80 +352,10 @@ func main() {
log.Println("Telegraf version already configured to: " + internal.Version())
}

if runtime.GOOS == "windows" && windowsRunAsService() {
programFiles := os.Getenv("ProgramFiles")
if programFiles == "" { // Should never happen
programFiles = "C:\\Program Files"
}
svcConfig := &service.Config{
Name: *fServiceName,
DisplayName: *fServiceDisplayName,
Description: "Collects data using a series of plugins and publishes it to" +
"another series of plugins.",
Arguments: []string{"--config", programFiles + "\\Telegraf\\telegraf.conf"},
}

prg := &program{
inputFilters: inputFilters,
outputFilters: outputFilters,
aggregatorFilters: aggregatorFilters,
processorFilters: processorFilters,
}
s, err := service.New(prg, svcConfig)
if err != nil {
log.Fatal("E! " + err.Error())
}
// Handle the --service flag here to prevent any issues with tooling that
// may not have an interactive session, e.g. installing from Ansible.
if *fService != "" {
if *fConfig != "" {
svcConfig.Arguments = []string{"--config", *fConfig}
}
if *fConfigDirectory != "" {
svcConfig.Arguments = append(svcConfig.Arguments, "--config-directory", *fConfigDirectory)
}
//set servicename to service cmd line, to have a custom name after relaunch as a service
svcConfig.Arguments = append(svcConfig.Arguments, "--service-name", *fServiceName)

err := service.Control(s, *fService)
if err != nil {
log.Fatal("E! " + err.Error())
}
os.Exit(0)
} else {
winlogger, err := s.Logger(nil)
if err == nil {
//When in service mode, register eventlog target andd setup default logging to eventlog
logger.RegisterEventLogger(winlogger)
logger.SetupLogging(logger.LogConfig{LogTarget: logger.LogTargetEventlog})
}
err = s.Run()

if err != nil {
log.Println("E! " + err.Error())
}
}
} else {
stop = make(chan struct{})
reloadLoop(
stop,
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
}

// Return true if Telegraf should create a Windows service.
func windowsRunAsService() bool {
if *fService != "" {
return true
}

if *fRunAsConsole {
return false
}

return !service.Interactive()
run(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
13 changes: 13 additions & 0 deletions cmd/telegraf/telegraf_notwindows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// +build !windows

package main

func run(inputFilters, outputFilters, aggregatorFilters, processorFilters []string) {
stop = make(chan struct{})
reloadLoop(
inputFilters,
outputFilters,
aggregatorFilters,
processorFilters,
)
}
Loading

0 comments on commit 819481b

Please sign in to comment.