Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs/directory_monitor): Add support for multiline file parsing #11234

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions plugins/inputs/directory_monitor/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Directory Monitor Input Plugin

This plugin monitors a single directory (without looking at sub-directories), and takes in each file placed in the directory.
The plugin will gather all files in the directory at a configurable interval (`monitor_interval`), and parse the ones that haven't been picked up yet.
The plugin will gather all files in the directory at the configured interval, and parse the ones that haven't been picked up yet.

This plugin is intended to read files that are moved or copied to the monitored directory, and thus files should also not be used by another process or else they may fail to be gathered. Please be advised that this plugin pulls files directly after they've been in the directory for the length of the configurable `directory_duration_threshold`, and thus files should not be written 'live' to the monitored directory. If you absolutely must write files directly, they must be guaranteed to finish writing before the `directory_duration_threshold`.

Expand Down Expand Up @@ -46,10 +46,13 @@ This plugin is intended to read files that are moved or copied to the monitored
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: line-by-line, complete-file
Hipska marked this conversation as resolved.
Show resolved Hide resolved
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"
```
59 changes: 46 additions & 13 deletions plugins/inputs/directory_monitor/directory_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
Expand All @@ -36,6 +37,7 @@ var (
defaultMaxBufferedMetrics = 10000
defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond)
defaultFileQueueSize = 100000
defaultParseMethod = "line-by-line"
)

type DirectoryMonitor struct {
Expand All @@ -50,6 +52,7 @@ type DirectoryMonitor struct {
DirectoryDurationThreshold config.Duration `toml:"directory_duration_threshold"`
Log telegraf.Logger `toml:"-"`
FileQueueSize int `toml:"file_queue_size"`
ParseMethod string `toml:"parse_method"`

filesInUse sync.Map
cancel context.CancelFunc
Expand Down Expand Up @@ -200,7 +203,7 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {

parser, err := monitor.parserFunc()
if err != nil {
return fmt.Errorf("E! Creating parser: %s", err.Error())
return fmt.Errorf("creating parser: %w", err)
}

// Handle gzipped files.
Expand All @@ -218,41 +221,66 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
}

func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
metrics, err := monitor.parseLine(parser, scanner.Bytes())
if monitor.ParseMethod == "complete-file" {
bytes, err := io.ReadAll(reader)
if err != nil {
return err
}

if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
metrics, err := monitor.parseMetrics(parser, bytes, fileName)
if err != nil {
return err
}

if err := monitor.sendMetrics(metrics); err != nil {
return err
}
}

return nil
scanner := bufio.NewScanner(reader)
/* To be used when we add a new scanner based parse method
switch monitor.ParseMethod {
case "line-by-line":
scanner.Split(bufio.ScanLines)
case "future-split-type":
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { return })
} */
Hipska marked this conversation as resolved.
Show resolved Hide resolved

for scanner.Scan() {
metrics, err := monitor.parseMetrics(parser, scanner.Bytes(), fileName)
if err != nil {
return err
}

if err := monitor.sendMetrics(metrics); err != nil {
return err
}
}

return scanner.Err()
}

func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte) ([]telegraf.Metric, error) {
func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) {
switch parser.(type) {
case *csv.Parser:
m, err := parser.Parse(line)
metrics, err = parser.Parse(line)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
}
return m, err
default:
return parser.Parse(line)
metrics, err = parser.Parse(line)
}

if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
}

return metrics, err
}

func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error {
Expand Down Expand Up @@ -357,6 +385,10 @@ func (monitor *DirectoryMonitor) Init() error {
monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex)
}

if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "complete-file"}); err != nil {
return fmt.Errorf("config option parse_method: %w", err)
}

return nil
}

Expand All @@ -368,6 +400,7 @@ func init() {
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
Hipska marked this conversation as resolved.
Show resolved Hide resolved
}
})
}
82 changes: 70 additions & 12 deletions plugins/inputs/directory_monitor/directory_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func TestCSVGZImport(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -91,8 +92,9 @@ func TestMultipleJSONFileImports(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -140,8 +142,9 @@ func TestFileTag(t *testing.T) {
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
FileTag: "filename",
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -194,8 +197,9 @@ func TestCSVNoSkipRows(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -262,8 +266,9 @@ func TestCSVSkipRows(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -332,8 +337,9 @@ func TestCSVMultiHeader(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
Expand Down Expand Up @@ -387,3 +393,55 @@ hello,80,test_name2`
require.Equal(t, expectedFields, m.Fields)
}
}

func TestParseCompleteFile(t *testing.T) {
acc := testutil.Accumulator{}

// Establish process directory and finished directory.
finishedDirectory := t.TempDir()
processDirectory := t.TempDir()

// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: "complete-file",
}
err := r.Init()
require.NoError(t, err)
r.Log = testutil.Logger{}

parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "name",
TagKeys: []string{"tag1"},
}

r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})

testJSON := `{
"name": "test1",
"value": 100.1,
"tag1": "value1"
}`

// Write json file to process into the 'process' directory.
f, _ := os.CreateTemp(processDirectory, "test.json")
_, _ = f.WriteString(testJSON)
_ = f.Close()

err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()

require.NoError(t, acc.FirstError())
require.Len(t, acc.Metrics, 1)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
}
5 changes: 4 additions & 1 deletion plugins/inputs/directory_monitor/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: line-by-line, complete-file
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"