Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nagios parser to parse all perfdata and report info message. #5601

Merged
merged 6 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Append nagios status after parsing the output
  • Loading branch information
Sergey Khegay committed Mar 25, 2019
commit eca941094e8efedd885668579f8b8ff4bd764702
88 changes: 43 additions & 45 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package exec
import (
"bytes"
"fmt"
"log"
"os/exec"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -60,19 +61,18 @@ func NewExec() *Exec {
}

type Runner interface {
Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
Run(string, time.Duration) ([]byte, []byte, error)
}

type CommandRunner struct{}

func (c CommandRunner) Run(
e *Exec,
command string,
acc telegraf.Accumulator,
) ([]byte, error) {
timeout time.Duration,
) ([]byte, []byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
return nil, nil, fmt.Errorf("exec: unable to parse command, %s", err)
}

cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
Expand All @@ -84,47 +84,35 @@ func (c CommandRunner) Run(
cmd.Stdout = &out
cmd.Stderr = &stderr

_, isNagiosParser := e.parser.(*nagios.NagiosParser)

err = internal.RunTimeout(cmd, e.Timeout.Duration)
nagiosErr := err
if err != nil && !isNagiosParser {
var errMessage = ""
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
// Limit the number of bytes.
didTruncate := false
if stderr.Len() > MaxStderrBytes {
stderr.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(stderr.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < stderr.Len()-1 {
didTruncate = true
}
stderr.Truncate(i)
}
if didTruncate {
stderr.WriteString("...")
}
runErr := internal.RunTimeout(cmd, timeout)

errMessage = fmt.Sprintf(": %s", stderr.String())
}
return nil, fmt.Errorf("exec: %s for command '%s'%s", err, command, errMessage)
out = removeCarriageReturns(out)
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
stderr = truncate(stderr)
}

out = removeCarriageReturns(out)
return out.Bytes(), stderr.Bytes(), runErr
}

if isNagiosParser {
exitCode, err := nagios.GetExitCode(nagiosErr)
if err != nil {
return nil, fmt.Errorf("exec: get exit code: %s", err)
func truncate(buf bytes.Buffer) bytes.Buffer {
// Limit the number of bytes.
didTruncate := false
if buf.Len() > MaxStderrBytes {
buf.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < buf.Len()-1 {
didTruncate = true
}
nagios.AppendExitCode(&out, exitCode)
buf.Truncate(i)
}

return out.Bytes(), nil
if didTruncate {
buf.WriteString("...")
}
return buf
}

// removeCarriageReturns removes all carriage returns from the input if the
Expand Down Expand Up @@ -155,21 +143,31 @@ func removeCarriageReturns(b bytes.Buffer) bytes.Buffer {

func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()
_, isNagios := e.parser.(*nagios.NagiosParser)

out, err := e.runner.Run(e, command, acc)
if err != nil {
out, errbuf, runErr := e.runner.Run(command, e.Timeout.Duration)
if !isNagios && runErr != nil {
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errbuf))
acc.AddError(err)
return
}

metrics, err := e.parser.Parse(out)
if err != nil {
acc.AddError(err)
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
return
}

if isNagios {
metrics, err = nagios.TryAddState(runErr, metrics)
if err != nil {
log.Printf("E! [inputs.exec] failed to add nagios state: %s", err)
}
}

for _, m := range metrics {
acc.AddMetric(m)
}
}

func (e *Exec) SampleConfig() string {
Expand Down
87 changes: 73 additions & 14 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"runtime"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"

"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -74,22 +74,21 @@ var crTests = []CarriageReturnTest{
}

type runnerMock struct {
out []byte
err error
out []byte
errout []byte
err error
}

func newRunnerMock(out []byte, err error) Runner {
func newRunnerMock(out []byte, errout []byte, err error) Runner {
return &runnerMock{
out: out,
err: err,
out: out,
errout: errout,
err: err,
}
}

func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
return r.out, nil
func (r runnerMock) Run(command string, _ time.Duration) ([]byte, []byte, error) {
return r.out, r.errout, r.err
}

func TestExec(t *testing.T) {
Expand All @@ -98,7 +97,7 @@ func TestExec(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
runner: newRunnerMock([]byte(validJson), nil, nil),
Commands: []string{"testcommand arg1"},
parser: parser,
}
Expand Down Expand Up @@ -127,7 +126,7 @@ func TestExecMalformed(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
runner: newRunnerMock([]byte(malformedJson), nil, nil),
Commands: []string{"badcommand arg1"},
parser: parser,
}
Expand All @@ -143,7 +142,7 @@ func TestCommandError(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
runner: newRunnerMock(nil, nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
parser: parser,
}
Expand Down Expand Up @@ -201,6 +200,66 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
acc.AssertContainsFields(t, "metric", fields)
}

func TestTruncate(t *testing.T) {
tests := []struct {
name string
bufF func() *bytes.Buffer
expF func() *bytes.Buffer
}{
{
name: "should not truncate",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
},
},
{
name: "should truncate up to the new line",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world\nand all the people")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world...")
return &b
},
},
{
name: "should truncate to the MaxStderrBytes",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < 2*MaxStderrBytes; i++ {
b.WriteByte('b')
}
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < MaxStderrBytes; i++ {
b.WriteByte('b')
}
b.WriteString("...")
return &b
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := truncate(*tt.bufF())
require.Equal(t, tt.expF().Bytes(), res.Bytes())
})
}
}

func TestRemoveCarriageReturns(t *testing.T) {
if runtime.GOOS == "windows" {
// Test that all carriage returns are removed
Expand Down
62 changes: 30 additions & 32 deletions plugins/parsers/nagios/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package nagios
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"log"
"os/exec"
"regexp"
Expand All @@ -17,9 +17,9 @@ import (
"github.com/influxdata/telegraf/metric"
)

// GetExitCode get the exit code from an error value which is the result
// getExitCode get the exit code from an error value which is the result
// of running a command through exec package api.
func GetExitCode(err error) (int, error) {
func getExitCode(err error) (int, error) {
if err == nil {
return 0, nil
}
Expand All @@ -40,35 +40,37 @@ func GetExitCode(err error) (int, error) {
return ws.ExitStatus(), nil
}

// AppendExitCode appends exit code to the given buffer.
// The exit code is encoded as uint64 in the little endian notation. An
// empty byte is inserted before the 8 bytes of the code.
//
// See ExtractExitCode.
func AppendExitCode(buf *bytes.Buffer, exitCode int) {
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, uint64(exitCode))

buf.WriteByte(0)
buf.Write(bytes)
}

const defaultExitCode int = 0
// TryAddState attempts to add a state derived from the runErr.
// If any error occurs, it is guaranteed to be returned along with
// the initial metric slice.
func TryAddState(runErr error, metrics []telegraf.Metric) ([]telegraf.Metric, error) {
state, err := getExitCode(runErr)
if err != nil {
return metrics, fmt.Errorf("exec: get exit code: %s", err)
}

// ExtractExitCode extracts exit code from the given byte slice.
//
// See AppendExitCode.
func ExtractExitCode(buf []byte) ([]byte, int) {
n := len(buf)
for _, m := range metrics {
if m.Name() == "nagios_state" {
m.AddField("state", state)
return metrics, nil
}
}

if n < 9 {
return buf, defaultExitCode
var ts time.Time
if len(metrics) != 0 {
ts = metrics[0].Time()
} else {
ts = time.Now().UTC()
}
if buf[n-9] == 0 {
return buf[:n-9], int(binary.LittleEndian.Uint64(buf[n-8:]))
f := map[string]interface{}{
"state": state,
}

return buf, defaultExitCode
m, err := metric.New("nagios_state", nil, f, ts)
if err != nil {
return metrics, err
}
metrics = append(metrics, m)
return metrics, nil
}

type NagiosParser struct {
Expand All @@ -95,9 +97,6 @@ func (p *NagiosParser) SetDefaultTags(tags map[string]string) {
func (p *NagiosParser) Parse(buf []byte) ([]telegraf.Metric, error) {
ts := time.Now().UTC()

var state int
buf, state = ExtractExitCode(buf)

s := bufio.NewScanner(bytes.NewReader(buf))

var msg bytes.Buffer
Expand Down Expand Up @@ -161,7 +160,6 @@ func (p *NagiosParser) Parse(buf []byte) ([]telegraf.Metric, error) {

// Create nagios state.
fields := map[string]interface{}{
"state": state,
"service_output": msg.String(),
}
if longmsg.Len() != 0 {
Expand Down
Loading