Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
Make Logparser Plugin Check For New Files (influxdata#2141)
Browse files Browse the repository at this point in the history
* Make Logparser Plugin Check For New Files

Check in the Gather metric to see if any new files matching the glob
have appeared. If so, start tailing them from the beginning.

* changelog update for influxdata#2141
  • Loading branch information
njwhite authored and mlinde201 committed Feb 6, 2017
1 parent 47d15ce commit 335a7d7
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ It is highly recommended that all users migrate to the new riemann output plugin

### Features

- [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files.
- [#2137](https://github.com/influxdata/telegraf/pull/2137): Added userstats to mysql input plugin.
- [#2179](https://github.com/influxdata/telegraf/pull/2179): Added more InnoDB metric to MySQL plugin.
- [#2251](https://github.com/influxdata/telegraf/pull/2251): InfluxDB output: use own client for improved through-put and less allocations.
Expand Down
36 changes: 29 additions & 7 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogParserPlugin struct {
Files []string
FromBeginning bool

tailers []*tail.Tail
tailers map[string]*tail.Tail
lines chan string
done chan struct{}
wg sync.WaitGroup
Expand All @@ -46,7 +46,9 @@ const sampleConfig = `
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["/var/log/apache/access.log"]
## Read file from beginning.
## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
from_beginning = false
## Parse logstash-style "grok" patterns:
Expand Down Expand Up @@ -77,7 +79,11 @@ func (l *LogParserPlugin) Description() string {
}

func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error {
return nil
l.Lock()
defer l.Unlock()

// always start from the beginning of files that appear while we're running
return l.tailNewfiles(true)
}

func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
Expand All @@ -87,6 +93,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.acc = acc
l.lines = make(chan string, 1000)
l.done = make(chan struct{})
l.tailers = make(map[string]*tail.Tail)

// Looks for fields which implement LogParser interface
l.parsers = []LogParser{}
Expand Down Expand Up @@ -121,14 +128,22 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
return err
}

l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !l.FromBeginning {
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

l.wg.Add(1)
go l.parser()
errChan := errchan.New(len(l.Files))

// Create a "tailer" for each file
for _, filepath := range l.Files {
Expand All @@ -139,7 +154,13 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}
files := g.Match()
errChan = errchan.New(len(files))

for file, _ := range files {
if _, ok := l.tailers[file]; ok {
// we're already tailing this file
continue
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand All @@ -152,7 +173,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
l.tailers[file] = tailer
}
}

Expand All @@ -166,6 +187,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {

var line *tail.Line
for line = range tailer.Lines {

if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, line.Err)
Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/logparser/logparser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logparser

import (
"io/ioutil"
"os"
"runtime"
"strings"
"testing"
Expand Down Expand Up @@ -80,6 +82,47 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{})
}

func TestGrokParseLogFilesAppearLater(t *testing.T) {
emptydir, err := ioutil.TempDir("", "TestGrokParseLogFilesAppearLater")
defer os.RemoveAll(emptydir)
assert.NoError(t, err)

thisdir := getCurrentDir()
p := &grok.Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatternFiles: []string{thisdir + "grok/testdata/test-patterns"},
}

logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{emptydir + "/*.log"},
GrokParser: p,
}

acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc))

time.Sleep(time.Millisecond * 500)
assert.Equal(t, acc.NFields(), 0)

os.Symlink(
thisdir+"grok/testdata/test_a.log",
emptydir+"/test_a.log")
assert.NoError(t, logparser.Gather(&acc))
time.Sleep(time.Millisecond * 500)

logparser.Stop()

acc.AssertContainsTaggedFields(t, "logparser_grok",
map[string]interface{}{
"clientip": "192.168.1.1",
"myfloat": float64(1.25),
"response_time": int64(5432),
"myint": int64(101),
},
map[string]string{"response_code": "200"})
}

// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) {
Expand Down

0 comments on commit 335a7d7

Please sign in to comment.