Skip to content

Commit

Permalink
fix: directory monitor input plugin when data format is CSV and csv_s…
Browse files Browse the repository at this point in the history
…kip_rows>0 and csv_header_row_count>=1 (influxdata#9865)
  • Loading branch information
etycomputer authored Nov 16, 2021
1 parent b9e4978 commit db86904
Show file tree
Hide file tree
Showing 6 changed files with 499 additions and 92 deletions.
24 changes: 7 additions & 17 deletions plugins/inputs/directory_monitor/directory_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,12 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
}

func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error {
// Read the file line-by-line and parse with the configured parse method.
firstLine := true
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
metrics, err := monitor.parseLine(parser, scanner.Bytes(), firstLine)
metrics, err := monitor.parseLine(parser, scanner.Bytes())
if err != nil {
return err
}
firstLine = false

if monitor.FileTag != "" {
for _, m := range metrics {
Expand All @@ -285,24 +282,17 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read
return nil
}

func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte, firstLine bool) ([]telegraf.Metric, error) {
func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte) ([]telegraf.Metric, error) {
switch parser.(type) {
case *csv.Parser:
// The CSV parser parses headers in Parse and skips them in ParseLine.
if firstLine {
return parser.Parse(line)
}

m, err := parser.ParseLine(string(line))
m, err := parser.Parse(line)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
}

if m != nil {
return []telegraf.Metric{m}, nil
}

return []telegraf.Metric{}, nil
return m, err
default:
return parser.Parse(line)
}
Expand Down
224 changes: 222 additions & 2 deletions plugins/inputs/directory_monitor/directory_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package directory_monitor
import (
"bytes"
"compress/gzip"
"github.com/stretchr/testify/require"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -193,3 +192,224 @@ func TestFileTag(t *testing.T) {
}
}
}

func TestCSVNoSkipRows(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"

// Establish process directory and finished directory.
finishedDirectory, err := os.MkdirTemp("", "finished")
require.NoError(t, err)
processDirectory, err := os.MkdirTemp("", "test")
require.NoError(t, err)
defer os.RemoveAll(processDirectory)
defer os.RemoveAll(finishedDirectory)

// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
}
err = r.Init()
require.NoError(t, err)

parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 1,
CSVSkipRows: 0,
CSVTagColumns: []string{"line1"},
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
r.Log = testutil.Logger{}

testCSV := `line1,line2,line3
hello,80,test_name2`

expectedFields := map[string]interface{}{
"line2": int64(80),
"line3": "test_name2",
}

// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testCsvFile))
require.NoError(t, err)
_, err = f.WriteString(testCSV)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

// Start plugin before adding file.
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()

// Verify that we read both files once.
require.Equal(t, len(acc.Metrics), 1)

// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile))
require.NoError(t, err)
for _, m := range acc.Metrics {
for key, value := range m.Tags {
require.Equal(t, "line1", key)
require.Equal(t, "hello", value)
}
require.Equal(t, expectedFields, m.Fields)
}
}

func TestCSVSkipRows(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"

// Establish process directory and finished directory.
finishedDirectory, err := os.MkdirTemp("", "finished")
require.NoError(t, err)
processDirectory, err := os.MkdirTemp("", "test")
require.NoError(t, err)
defer os.RemoveAll(processDirectory)
defer os.RemoveAll(finishedDirectory)

// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
}
err = r.Init()
require.NoError(t, err)

parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 1,
CSVSkipRows: 2,
CSVTagColumns: []string{"line1"},
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
r.Log = testutil.Logger{}

testCSV := `garbage nonsense 1
garbage,nonsense,2
line1,line2,line3
hello,80,test_name2`

expectedFields := map[string]interface{}{
"line2": int64(80),
"line3": "test_name2",
}

// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testCsvFile))
require.NoError(t, err)
_, err = f.WriteString(testCSV)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

// Start plugin before adding file.
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()

// Verify that we read both files once.
require.Equal(t, len(acc.Metrics), 1)

// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile))
require.NoError(t, err)
for _, m := range acc.Metrics {
for key, value := range m.Tags {
require.Equal(t, "line1", key)
require.Equal(t, "hello", value)
}
require.Equal(t, expectedFields, m.Fields)
}
}

func TestCSVMultiHeader(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"

// Establish process directory and finished directory.
finishedDirectory, err := os.MkdirTemp("", "finished")
require.NoError(t, err)
processDirectory, err := os.MkdirTemp("", "test")
require.NoError(t, err)
defer os.RemoveAll(processDirectory)
defer os.RemoveAll(finishedDirectory)

// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
}
err = r.Init()
require.NoError(t, err)

parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 2,
CSVTagColumns: []string{"line1"},
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
r.Log = testutil.Logger{}

testCSV := `line,line,line
1,2,3
hello,80,test_name2`

expectedFields := map[string]interface{}{
"line2": int64(80),
"line3": "test_name2",
}

// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testCsvFile))
require.NoError(t, err)
_, err = f.WriteString(testCSV)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

// Start plugin before adding file.
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()

// Verify that we read both files once.
require.Equal(t, len(acc.Metrics), 1)

// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile))
require.NoError(t, err)
for _, m := range acc.Metrics {
for key, value := range m.Tags {
require.Equal(t, "line1", key)
require.Equal(t, "hello", value)
}
require.Equal(t, expectedFields, m.Fields)
}
}
25 changes: 7 additions & 18 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,17 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
}

// ParseLine parses a line of text.
func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.Metric, error) {
func parseLine(parser parsers.Parser, line string) ([]telegraf.Metric, error) {
switch parser.(type) {
case *csv.Parser:
// The csv parser parses headers in Parse and skips them in ParseLine.
// As a temporary solution call Parse only when getting the first
// line from the file.
if firstLine {
return parser.Parse([]byte(line))
}

m, err := parser.ParseLine(line)
m, err := parser.Parse([]byte(line))
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
}

if m != nil {
return []telegraf.Metric{m}, nil
}
return []telegraf.Metric{}, nil
return m, err
default:
return parser.Parse([]byte(line))
}
Expand All @@ -315,8 +307,6 @@ func parseLine(parser parsers.Parser, line string, firstLine bool) ([]telegraf.M
// Receiver is launched as a goroutine to continuously watch a tailed logfile
// for changes, parse any incoming msgs, and add to the accumulator.
func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
var firstLine = true

// holds the individual lines of multi-line log entries.
var buffer bytes.Buffer

Expand Down Expand Up @@ -378,13 +368,12 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
continue
}

metrics, err := parseLine(parser, text, firstLine)
metrics, err := parseLine(parser, text)
if err != nil {
t.Log.Errorf("Malformed log line in %q: [%q]: %s",
tailer.Filename, text, err.Error())
continue
}
firstLine = false

if t.PathTag != "" {
for _, metric := range metrics {
Expand Down
Loading

0 comments on commit db86904

Please sign in to comment.