Skip to content

Commit b6b5e1a

Browse files
committed
#5 bugfixes and new tests for the multiple logfile tailer
1 parent f0aa540 commit b6b5e1a

File tree

7 files changed

+89
-16
lines changed

7 files changed

+89
-16
lines changed

tailer/bufferedTailer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
type bufferedTailer struct {
2424
out chan *fswatcher.Line
2525
orig fswatcher.FileTailer
26+
done chan struct{}
2627
}
2728

2829
func (b *bufferedTailer) Lines() chan *fswatcher.Line {
@@ -35,6 +36,7 @@ func (b *bufferedTailer) Errors() chan fswatcher.Error {
3536

3637
func (b *bufferedTailer) Close() {
3738
b.orig.Close()
39+
close(b.done)
3840
}
3941

4042
func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
@@ -103,6 +105,7 @@ func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
103105
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, log logrus.FieldLogger, maxLinesInBuffer int) fswatcher.FileTailer {
104106
buffer := NewLineBuffer()
105107
out := make(chan *fswatcher.Line)
108+
done := make(chan struct{})
106109

107110
// producer
108111
go func() {
@@ -135,12 +138,16 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe
135138
return
136139
}
137140
bufferLoadMetric.Dec()
138-
out <- line
141+
select {
142+
case out <- line:
143+
case <-done:
144+
}
139145
}
140146
}()
141147
return &bufferedTailer{
142148
out: out,
143149
orig: orig,
150+
done: done,
144151
}
145152
}
146153

tailer/fswatcher/file_darwin.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package fswatcher
1717
import (
1818
"io"
1919
"os"
20+
"syscall"
2021
)
2122

2223
// On macOS, we keep dirs open, so we use *os.File.
@@ -44,8 +45,12 @@ func (d *Dir) ls() ([]os.FileInfo, Error) {
4445
return fileInfos, nil
4546
}
4647

47-
func NewFile(orig *os.File, newPath string) *os.File {
48-
return os.NewFile(orig.Fd(), newPath)
48+
func NewFile(orig *os.File, newPath string) (*os.File, error) {
49+
fd, err := syscall.Dup(int(orig.Fd()))
50+
if err != nil {
51+
return nil, err
52+
}
53+
return os.NewFile(uintptr(fd), newPath), nil
4954
}
5055

5156
func open(path string) (*os.File, error) {

tailer/fswatcher/file_linux.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
package fswatcher
1616

17-
import "os"
17+
import (
18+
"os"
19+
"syscall"
20+
)
1821

1922
// On Linux, we don't need to keep the directory open, but we need to keep an open watch descriptor handle.
2023
type Dir struct {
@@ -45,8 +48,13 @@ func (d *Dir) ls() ([]os.FileInfo, Error) {
4548
return fileInfos, nil
4649
}
4750

48-
func NewFile(orig *os.File, newPath string) *os.File {
49-
return os.NewFile(orig.Fd(), newPath)
51+
func NewFile(orig *os.File, newPath string) (*os.File, error) {
52+
// The finalizer will close orig.Fd() even if we don't close it explicitly. Therefore we must Dup().
53+
fd, err := syscall.Dup(int(orig.Fd()))
54+
if err != nil {
55+
return nil, err
56+
}
57+
return os.NewFile(uintptr(fd), newPath), nil
5058
}
5159

5260
func open(path string) (*os.File, error) {

tailer/fswatcher/file_windows.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ func (d *Dir) ls() ([]*fileInfo, Error) {
7171
}
7272

7373
// like os.NewFile()
74-
func NewFile(orig *File, newPath string) *File {
74+
func NewFile(orig *File, newPath string) (*File, error) {
7575
return &File{
7676
path: newPath,
7777
currentPos: orig.currentPos,
7878
fileIndexLow: orig.fileIndexLow,
7979
fileIndexHigh: orig.fileIndexHigh,
80-
}
80+
}, nil
8181
}
8282

8383
func open(path string) (*File, Error) {

tailer/fswatcher/fseventProducerLoop_linux.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"strings"
2020
"syscall"
21+
"time"
2122
"unsafe"
2223
)
2324

@@ -71,9 +72,15 @@ func runInotifyLoop(fd int) *inotifyloop {
7172
for {
7273
n, err = syscall.Read(l.fd, buf)
7374
if err != nil {
75+
// Getting an err might be part of the shutdown, when l.fd is closed.
76+
// We decide whether it is an actual error or not by checking if l.done is closed.
7477
select {
75-
case l.errors <- NewError(NotSpecified, err, "failed to read inotify events"):
7678
case <-l.done:
79+
case <-time.After(2 * time.Second):
80+
select {
81+
case l.errors <- NewError(NotSpecified, err, "failed to read inotify events"):
82+
case <-l.done:
83+
}
7784
}
7885
return
7986
}

tailer/fswatcher/fswatcher.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ func runFileTailer(initFunc func() (fswatcher, Error), globs []glob.Glob, readal
150150
select {
151151
case <-t.done:
152152
case t.errors <- Err:
153-
return
154153
}
154+
return
155155
}
156156
}
157157

@@ -204,7 +204,7 @@ func (t *fileTailer) shutdown() {
204204
close(t.errors)
205205

206206
warnf := func(format string, args ...interface{}) {
207-
log.Warnf("error while shutting down the file system watcher: %v", fmt.Sprint(format, args))
207+
log.Warnf("error while shutting down the file system watcher: %v", fmt.Sprintf(format, args))
208208
}
209209

210210
for _, dir := range t.watchedDirs {
@@ -278,7 +278,11 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
278278
if alreadyWatched != nil {
279279
if alreadyWatched.file.Name() != filePath {
280280
fileLogger.WithField("fd", alreadyWatched.file.Fd()).Infof("file was moved from %v", alreadyWatched.file.Name())
281-
alreadyWatched.file = NewFile(alreadyWatched.file, filePath)
281+
oldFileWithNewPath, err := NewFile(alreadyWatched.file, filePath)
282+
if err != nil {
283+
return NewErrorf(NotSpecified, err, "%v: failed to follow moved file", filePath)
284+
}
285+
alreadyWatched.file = oldFileWithNewPath
282286
} else {
283287
fileLogger.Debug("skipping, because file is already watched")
284288
}
@@ -287,7 +291,12 @@ func (t *fileTailer) syncFilesInDir(dir *Dir, readall bool, log logrus.FieldLogg
287291
}
288292
newFile, err := open(filePath)
289293
if err != nil {
290-
return NewErrorf(NotSpecified, err, "%v: failed to open file", filePath)
294+
if os.IsNotExist(err) {
295+
fileLogger.Debug("skipping, because file does no longer exist")
296+
continue
297+
} else {
298+
return NewErrorf(NotSpecified, err, "%v: failed to open file", filePath)
299+
}
291300
}
292301
if !readall {
293302
_, err = newFile.Seek(0, io.SeekEnd)

tailer/fswatcher_test.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,38 @@ const tests = `
102102
- [expect, test line 3 dir 1 file 2, logdir1/logfile-2.log]
103103
- [expect, test line 3 dir 2 file 1, logdir2/logfile-1.log]
104104
- [expect, test line 3 dir 2 file 2, logdir2/logfile-2.log]
105+
106+
- name: nested directories
107+
commands:
108+
- [mkdir, outer]
109+
- [mkdir, outer/inner]
110+
- [log, outer line 1, outer/logfile.log]
111+
- [log, inner line 1, outer/inner/logfile.log]
112+
- [start file tailer, readall=true, fail_on_missing_logfile=false, outer/*.log, outer/inner/*.log]
113+
- [expect, outer line 1, outer/logfile.log]
114+
- [expect, inner line 1, outer/inner/logfile.log]
115+
- [log, outer line 2, outer/logfile.log]
116+
- [log, inner line 2, outer/inner/logfile.log]
117+
- [expect, outer line 2, outer/logfile.log]
118+
- [expect, inner line 2, outer/inner/logfile.log]
119+
- [logrotate, outer/logfile.log, outer/logfile.log.1]
120+
- [logrotate, outer/inner/logfile.log, outer/inner/logfile.log.1]
121+
- [log, outer line 3, outer/logfile.log]
122+
- [log, inner line 3, outer/inner/logfile.log]
123+
- [expect, outer line 3, outer/logfile.log]
124+
- [expect, inner line 3, outer/inner/logfile.log]
125+
126+
- name: watch after logrotate
127+
commands:
128+
- [mkdir, logdir]
129+
- [log, line 1, logdir/logfile.log]
130+
- [start file tailer, readall=true, fail_on_missing_logfile=false, logdir/*]
131+
- [expect, line 1, logdir/logfile.log]
132+
- [log, line 2, logdir/logfile.log]
133+
- [expect, line 2, logdir/logfile.log]
134+
- [logrotate, logdir/logfile.log, logdir/logfile.log.1]
135+
- [log, line 3, logdir/logfile.log]
136+
- [expect, line 3, logdir/logfile.log]
105137
`
106138

107139
// // The following test fails on Windows in tearDown() when removing logdir.
@@ -209,14 +241,19 @@ func executeCommands(t *testing.T, ctx *context, cmds [][]string) {
209241
for _, cmd := range cmds {
210242
exec(t, ctx, cmd)
211243
}
212-
closeTailer(t, ctx)
244+
// The "watch after logrotate" test watches logdir/* and rotates logdir/logfile.log
245+
// to logdir/logfile.log.1. As a result, the file is still watched after it is rotated.
246+
// Depending on the logrotate config the lines are read again (cp) or not (mv).
247+
// We ignore unexpected lines for that test.
248+
// TODO: Make ignoreUnexpectedLines an explicit paramter in the test yaml instead of using the test name here.
249+
closeTailer(t, ctx, ctx.testName == "watch after logrotate")
213250
assertGoroutinesTerminated(t, ctx, nGoroutinesBefore)
214251
for _, writer := range ctx.logFileWriters {
215252
writer.close(t, ctx)
216253
}
217254
}
218255

219-
func closeTailer(t *testing.T, ctx *context) {
256+
func closeTailer(t *testing.T, ctx *context, ignoreUnexpectedLines bool) {
220257
// Note: This function checks if the Lines() channel gets closed.
221258
// While it's good to check this, it doesn't guarantee that the tailer is
222259
// fully shut down. There might be an fseventProducerLoop running in the
@@ -230,7 +267,7 @@ func closeTailer(t *testing.T, ctx *context) {
230267
// check if the lines channel gets closed
231268
select {
232269
case line, open := <-ctx.tailer.Lines():
233-
if open {
270+
if open && !ignoreUnexpectedLines {
234271
fatalf(t, ctx, "read unexpected line line from file %q: %q", line.File, line.Line)
235272
}
236273
case <-time.After(timeout):

0 commit comments

Comments
 (0)