From 3f8963ce018f4cc7c6b5792727ed3625e63f027a Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 15 Apr 2019 17:18:36 -0400 Subject: [PATCH] Updating hpcloud tail library to forked version modified for our purposes. Changes to the library move the handling of symlinks and re-opening to the tail library. fsnotify events are only used now to start new tailers for files not previously seen. --- Gopkg.lock | 7 +- Gopkg.toml | 5 + pkg/promtail/promtail_test.go | 12 -- pkg/promtail/targets/filetarget.go | 21 +-- pkg/promtail/targets/tailer.go | 51 +++---- vendor/github.com/hpcloud/tail/.travis.yml | 5 +- vendor/github.com/hpcloud/tail/README.md | 2 +- .../hpcloud/tail/ratelimiter/memory.go | 10 +- vendor/github.com/hpcloud/tail/tail.go | 124 +++++++++++++----- vendor/github.com/hpcloud/tail/tail_posix.go | 14 +- .../hpcloud/tail/watch/filechanges.go | 2 +- .../github.com/hpcloud/tail/watch/inotify.go | 11 +- .../hpcloud/tail/watch/inotify_tracker.go | 88 ++++++------- 13 files changed, 191 insertions(+), 161 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index aba7a417a119..1eff4874626a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -509,7 +509,8 @@ version = "v0.8.1" [[projects]] - digest = "1:a1038ef593beb4771c8f0f9c26e8b00410acd800af5c6864651d9bf160ea1813" + branch = "master" + digest = "1:d7eb20f276c32cfcd395d013f94d236c25d9f2ed4cf9108279e07e7544279f21" name = "github.com/hpcloud/tail" packages = [ ".", @@ -519,8 +520,8 @@ "winfile", ] pruneopts = "UT" - revision = "a30252cb686a21eb2d0b98132633053ec2f7f1e5" - version = "v1.0.0" + revision = "b3c6c4808227e5772299d6827851c3a6ed98baef" + source = "github.com/slim-bean/tail" [[projects]] digest = "1:bb81097a5b62634f3e9fec1014657855610c82d19b9a40c17612e32651e35dca" diff --git a/Gopkg.toml b/Gopkg.toml index b22c562487aa..954d7b032ba2 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -65,3 +65,8 @@ [prune] go-tests = true unused-packages = true + +[[constraint]] + name = "github.com/hpcloud/tail" + source = "github.com/slim-bean/tail" + branch = "master" diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index 4db825dec56d..9936500fc8fd 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -180,9 +180,6 @@ func fileRoll(t *testing.T, filename string, prefix string) int { time.Sleep(1 * time.Millisecond) } - //FIXME this is a hack to make sure the hpcloud tail polling implementation reads all lines before we roll the file - time.Sleep(300 * time.Millisecond) - if err = os.Rename(filename, filename+".1"); err != nil { t.Fatal("Failed to rename file for test: ", err) } @@ -199,9 +196,6 @@ func fileRoll(t *testing.T, filename string, prefix string) int { time.Sleep(1 * time.Millisecond) } - //FIXME this is a hack to make sure the hpcloud tail polling implementation reads all lines before we roll the file - time.Sleep(300 * time.Millisecond) - return 200 } @@ -231,9 +225,6 @@ func symlinkRoll(t *testing.T, testDir string, filename string, prefix string) i time.Sleep(1 * time.Millisecond) } - //FIXME this is a hack to make sure the hpcloud tail polling implementation reads all lines before we roll the file - time.Sleep(300 * time.Millisecond) - // Remove the link, make a new file, link to the new file. if err := os.Remove(filename); err != nil { t.Fatal(err) @@ -255,9 +246,6 @@ func symlinkRoll(t *testing.T, testDir string, filename string, prefix string) i time.Sleep(1 * time.Millisecond) } - //FIXME this is a hack to make sure the hpcloud tail polling implementation reads all lines before we roll the file - time.Sleep(300 * time.Millisecond) - return 200 } diff --git a/pkg/promtail/targets/filetarget.go b/pkg/promtail/targets/filetarget.go index 759b052739aa..23978151e46f 100644 --- a/pkg/promtail/targets/filetarget.go +++ b/pkg/promtail/targets/filetarget.go @@ -135,13 +135,6 @@ func (t *FileTarget) run() { case event := <-t.watcher.Events: switch event.Op { case fsnotify.Create: - // If the file was a symlink we don't get a Remove notification if the symlink resolves to a non watched directory. - // Close and re-open the tailer to make sure we tail the new file. - if tailer, ok := t.tails[event.Name]; ok { - level.Info(t.logger).Log("msg", "create for file being tailed. Will close and re-open", "filename", event.Name) - helpers.LogError("stopping tailer", tailer.stop) - delete(t.tails, event.Name) - } matched, err := filepath.Match(t.path, event.Name) if err != nil { level.Error(t.logger).Log("msg", "failed to match file", "error", err, "filename", event.Name) @@ -152,11 +145,6 @@ func (t *FileTarget) run() { continue } t.startTailing([]string{event.Name}) - case fsnotify.Remove: - t.stopTailing([]string{event.Name}) - case fsnotify.Rename: - // Rename is only issued on the original file path; the new name receives a Create event - t.stopTailing([]string{event.Name}) default: level.Debug(t.logger).Log("msg", "got unknown event", "event", event) } @@ -188,7 +176,7 @@ func (t *FileTarget) sync() error { } // Record the size of all the files matched by the Glob pattern. - t.updateTotalBytesMetric(matches) + matches = t.reportSizeAndRemoveMissing(matches) // Get the current unique set of dirs to watch. dirs := map[string]struct{}{} @@ -298,15 +286,18 @@ func toStopTailing(nt []string, et map[string]*tailer) []string { return ta } -func (t *FileTarget) updateTotalBytesMetric(ms []string) { +func (t *FileTarget) reportSizeAndRemoveMissing(ms []string) []string { + mso := ms[:0] for _, m := range ms { fi, err := os.Stat(m) if err != nil { - level.Error(t.logger).Log("msg", "failed to stat matched file, cannot report size", m, "error", err) + //If we can't stat the file, skip it continue } + mso = append(mso, m) totalBytes.WithLabelValues(m).Set(float64(fi.Size())) } + return mso } // Returns the elements from set b which are missing from set a diff --git a/pkg/promtail/targets/tailer.go b/pkg/promtail/targets/tailer.go index 18156fa3507e..804ef1fe750b 100644 --- a/pkg/promtail/targets/tailer.go +++ b/pkg/promtail/targets/tailer.go @@ -18,50 +18,30 @@ type tailer struct { handler api.EntryHandler positions *positions.Positions - path string - filename string - tail *tail.Tail + path string + tail *tail.Tail quit chan struct{} done chan struct{} } func newTailer(logger log.Logger, handler api.EntryHandler, positions *positions.Positions, path string) (*tailer, error) { - filename := path - var reOpen bool - - // Check if the path requested is a symbolic link - fi, err := os.Lstat(path) - if err != nil { - return nil, err - } - if fi.Mode()&os.ModeSymlink == os.ModeSymlink { - filename, err = os.Readlink(path) - if err != nil { - return nil, err - } - - // if we are tailing a symbolic link then we need to automatically re-open - // as we wont get a Create event when a file is rotated. - reOpen = true - } - // Simple check to make sure the file we are tailing doesn't // have a position already saved which is past the end of the file. - fi, err = os.Stat(filename) + fi, err := os.Stat(path) if err != nil { return nil, err } - if fi.Size() < positions.Get(filename) { - positions.Remove(filename) + if fi.Size() < positions.Get(path) { + positions.Remove(path) } - tail, err := tail.TailFile(filename, tail.Config{ + tail, err := tail.TailFile(path, tail.Config{ Follow: true, Poll: true, - ReOpen: reOpen, + ReOpen: true, Location: &tail.SeekInfo{ - Offset: positions.Get(filename), + Offset: positions.Get(path), Whence: 0, }, }) @@ -74,11 +54,10 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions *positions handler: api.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler), positions: positions, - path: path, - filename: filename, - tail: tail, - quit: make(chan struct{}), - done: make(chan struct{}), + path: path, + tail: tail, + quit: make(chan struct{}), + done: make(chan struct{}), } go tailer.run() filesActive.Add(1.) @@ -131,8 +110,8 @@ func (t *tailer) markPosition() error { } readBytes.WithLabelValues(t.path).Set(float64(pos)) - level.Debug(t.logger).Log("path", t.path, "filename", t.filename, "current_position", pos) - t.positions.Put(t.filename, pos) + level.Debug(t.logger).Log("path", t.path, "current_position", pos) + t.positions.Put(t.path, pos) return nil } @@ -151,5 +130,5 @@ func (t *tailer) stop() error { } func (t *tailer) cleanup() { - t.positions.Remove(t.filename) + t.positions.Remove(t.path) } diff --git a/vendor/github.com/hpcloud/tail/.travis.yml b/vendor/github.com/hpcloud/tail/.travis.yml index 9cf8bb7fc5fa..ad8971f8b3f3 100644 --- a/vendor/github.com/hpcloud/tail/.travis.yml +++ b/vendor/github.com/hpcloud/tail/.travis.yml @@ -4,9 +4,10 @@ script: - go test -race -v ./... go: - - 1.4 - 1.5 - 1.6 + - 1.7 + - 1.8 - tip matrix: @@ -14,5 +15,5 @@ matrix: - go: tip install: - - go get gopkg.in/fsnotify.v1 + - go get gopkg.in/fsnotify/fsnotify.v1 - go get gopkg.in/tomb.v1 diff --git a/vendor/github.com/hpcloud/tail/README.md b/vendor/github.com/hpcloud/tail/README.md index fb7fbc26c677..ed8bd9ac38e8 100644 --- a/vendor/github.com/hpcloud/tail/README.md +++ b/vendor/github.com/hpcloud/tail/README.md @@ -1,5 +1,5 @@ [![Build Status](https://travis-ci.org/hpcloud/tail.svg)](https://travis-ci.org/hpcloud/tail) -[![Build status](https://ci.appveyor.com/api/projects/status/kohpsf3rvhjhrox6?svg=true)](https://ci.appveyor.com/project/HelionCloudFoundry/tail) +[![Build status](https://ci.appveyor.com/api/projects/status/vrl3paf9md0a7bgk/branch/master?svg=true)](https://ci.appveyor.com/project/Nino-K/tail/branch/master) # Go package for tail-ing files diff --git a/vendor/github.com/hpcloud/tail/ratelimiter/memory.go b/vendor/github.com/hpcloud/tail/ratelimiter/memory.go index 8f6a5784a9a2..bf3c2131b1e9 100644 --- a/vendor/github.com/hpcloud/tail/ratelimiter/memory.go +++ b/vendor/github.com/hpcloud/tail/ratelimiter/memory.go @@ -5,7 +5,10 @@ import ( "time" ) -const GC_SIZE int = 100 +const ( + GC_SIZE int = 100 + GC_PERIOD time.Duration = 60 * time.Second +) type Memory struct { store map[string]LeakyBucket @@ -44,11 +47,10 @@ func (m *Memory) GarbageCollect() { now := time.Now() // rate limit GC to once per minute - if now.Add(60*time.Second).Unix() > m.lastGCCollected.Unix() { - + if now.Unix() >= m.lastGCCollected.Add(GC_PERIOD).Unix() { for key, bucket := range m.store { // if the bucket is drained, then GC - if bucket.DrainedAt().Unix() > now.Unix() { + if bucket.DrainedAt().Unix() < now.Unix() { delete(m.store, key) } } diff --git a/vendor/github.com/hpcloud/tail/tail.go b/vendor/github.com/hpcloud/tail/tail.go index 2d252d605727..269af5149b01 100644 --- a/vendor/github.com/hpcloud/tail/tail.go +++ b/vendor/github.com/hpcloud/tail/tail.go @@ -22,7 +22,7 @@ import ( ) var ( - ErrStop = fmt.Errorf("tail should now stop") + ErrStop = errors.New("tail should now stop") ) type Line struct { @@ -143,7 +143,7 @@ func (tail *Tail) Tell() (offset int64, err error) { if tail.file == nil { return } - offset, err = tail.file.Seek(0, os.SEEK_CUR) + offset, err = tail.file.Seek(0, io.SeekCurrent) if err != nil { return } @@ -184,7 +184,19 @@ func (tail *Tail) closeFile() { } } -func (tail *Tail) reopen() error { +func (tail *Tail) reopen(truncated bool) error { + + // There are cases where the file is reopened so quickly it's still the same file + // which causes the poller to hang on an open file handle to a file no longer being written to + // and which eventually gets deleted. Save the current file handle info to make sure we only + // start tailing a different file. + cf, err := tail.file.Stat() + if !truncated && err != nil { + log.Print("stat of old file returned, this is not expected and may result in unexpected behavior") + // We don't action on this error but are logging it, not expecting to see it happen and not sure if we + // need to action on it, cf is checked for nil later on to accommodate this + } + tail.closeFile() for { var err error @@ -202,6 +214,30 @@ func (tail *Tail) reopen() error { } return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err) } + + // File exists and is opened, get information about it. + nf, err := tail.file.Stat() + if err != nil { + tail.Logger.Print("Failed to stat new file to be tailed, will try to open it again") + tail.closeFile() + continue + } + + // Check to see if we are trying to reopen and tail the exact same file (and it was not truncated). + retries := 20 + if !truncated && cf != nil && os.SameFile(cf, nf) { + retries-- + if retries <= 0 { + return errors.New("gave up trying to reopen log file with a different handle") + } + select { + case <-time.After(watch.POLL_DURATION): + tail.closeFile() + continue + case <-tail.Tomb.Dying(): + return tomb.ErrDying + } + } break } return nil @@ -228,8 +264,8 @@ func (tail *Tail) tailFileSync() { defer tail.close() if !tail.MustExist { - // deferred first open. - err := tail.reopen() + // deferred first open, not technically truncated but we don't need to check for changed files + err := tail.reopen(true) if err != nil { if err != tomb.ErrDying { tail.Kill(err) @@ -250,8 +286,9 @@ func (tail *Tail) tailFileSync() { tail.openReader() - var offset int64 = 0 + var offset int64 var err error + oneMoreRun := false // Read line by line. for { @@ -273,10 +310,9 @@ func (tail *Tail) tailFileSync() { if cooloff { // Wait a second before seeking till the end of // file when rate limit is reached. - msg := fmt.Sprintf( - "Too much log activity; waiting a second " + - "before resuming tailing") - tail.Lines <- &Line{msg, time.Now(), fmt.Errorf(msg)} + msg := ("Too much log activity; waiting a second " + + "before resuming tailing") + tail.Lines <- &Line{msg, time.Now(), errors.New(msg)} select { case <-time.After(time.Second): case <-tail.Dying(): @@ -305,10 +341,24 @@ func (tail *Tail) tailFileSync() { } } + // oneMoreRun is set true when a file is deleted, + // this is to catch events which might get missed in polling mode. + // now that the last run is completed, finish deleting the file + if oneMoreRun { + oneMoreRun = false + err = tail.finishDelete() + if err != nil { + if err != ErrStop { + tail.Kill(err) + } + return + } + } + // When EOF is reached, wait for more data to become // available. Wait strategy is based on the `tail.watcher` // implementation (inotify or polling). - err := tail.waitForChanges() + oneMoreRun, err = tail.waitForChanges() if err != nil { if err != ErrStop { tail.Kill(err) @@ -335,51 +385,57 @@ func (tail *Tail) tailFileSync() { // waitForChanges waits until the file has been appended, deleted, // moved or truncated. When moved or deleted - the file will be // reopened if ReOpen is true. Truncated files are always reopened. -func (tail *Tail) waitForChanges() error { +func (tail *Tail) waitForChanges() (bool, error) { if tail.changes == nil { - pos, err := tail.file.Seek(0, os.SEEK_CUR) + pos, err := tail.file.Seek(0, io.SeekCurrent) if err != nil { - return err + return false, err } tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos) if err != nil { - return err + return false, err } } select { case <-tail.changes.Modified: - return nil + return false, nil case <-tail.changes.Deleted: - tail.changes = nil - if tail.ReOpen { - // XXX: we must not log from a library. - tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename) - if err := tail.reopen(); err != nil { - return err - } - tail.Logger.Printf("Successfully reopened %s", tail.Filename) - tail.openReader() - return nil - } else { - tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename) - return ErrStop - } + // In polling mode we could miss events when a file is deleted, so before we give up our file handle + // run the poll one more time to catch anything we may have missed since the last poll. + return true, nil case <-tail.changes.Truncated: // Always reopen truncated files (Follow is true) tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename) - if err := tail.reopen(); err != nil { - return err + if err := tail.reopen(true); err != nil { + return false, err } tail.Logger.Printf("Successfully reopened truncated %s", tail.Filename) tail.openReader() - return nil + return false, nil case <-tail.Dying(): - return ErrStop + return false, ErrStop } panic("unreachable") } +func (tail *Tail) finishDelete() error { + tail.changes = nil + if tail.ReOpen { + // XXX: we must not log from a library. + tail.Logger.Printf("Re-opening moved/deleted file %s ...", tail.Filename) + if err := tail.reopen(false); err != nil { + return err + } + tail.Logger.Printf("Successfully reopened %s", tail.Filename) + tail.openReader() + return nil + } else { + tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename) + return ErrStop + } +} + func (tail *Tail) openReader() { if tail.MaxLineSize > 0 { // add 2 to account for newline characters diff --git a/vendor/github.com/hpcloud/tail/tail_posix.go b/vendor/github.com/hpcloud/tail/tail_posix.go index bc4dc3357abd..a21f3ded46c6 100644 --- a/vendor/github.com/hpcloud/tail/tail_posix.go +++ b/vendor/github.com/hpcloud/tail/tail_posix.go @@ -7,5 +7,17 @@ import ( ) func OpenFile(name string) (file *os.File, err error) { - return os.Open(name) + filename := name + // Check if the path requested is a symbolic link + fi, err := os.Lstat(name) + if err != nil { + return nil, err + } + if fi.Mode()&os.ModeSymlink == os.ModeSymlink { + filename, err = os.Readlink(name) + if err != nil { + return nil, err + } + } + return os.Open(filename) } diff --git a/vendor/github.com/hpcloud/tail/watch/filechanges.go b/vendor/github.com/hpcloud/tail/watch/filechanges.go index 3ce5dcecbb22..f80aead9ad32 100644 --- a/vendor/github.com/hpcloud/tail/watch/filechanges.go +++ b/vendor/github.com/hpcloud/tail/watch/filechanges.go @@ -8,7 +8,7 @@ type FileChanges struct { func NewFileChanges() *FileChanges { return &FileChanges{ - make(chan bool), make(chan bool), make(chan bool)} + make(chan bool, 1), make(chan bool, 1), make(chan bool, 1)} } func (fc *FileChanges) NotifyModified() { diff --git a/vendor/github.com/hpcloud/tail/watch/inotify.go b/vendor/github.com/hpcloud/tail/watch/inotify.go index 4478f1e1a01c..2bbfe0b606d1 100644 --- a/vendor/github.com/hpcloud/tail/watch/inotify.go +++ b/vendor/github.com/hpcloud/tail/watch/inotify.go @@ -10,7 +10,7 @@ import ( "github.com/hpcloud/tail/util" - "gopkg.in/fsnotify.v1" + "gopkg.in/fsnotify/fsnotify.v1" "gopkg.in/tomb.v1" ) @@ -75,7 +75,6 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange fw.Size = pos go func() { - defer RemoveWatch(fw.Filename) events := Events(fw.Filename) @@ -88,9 +87,11 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange select { case evt, ok = <-events: if !ok { + RemoveWatch(fw.Filename) return } case <-t.Dying(): + RemoveWatch(fw.Filename) return } @@ -99,13 +100,19 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange fallthrough case evt.Op&fsnotify.Rename == fsnotify.Rename: + RemoveWatch(fw.Filename) changes.NotifyDeleted() return + //With an open fd, unlink(fd) - inotify returns IN_ATTRIB (==fsnotify.Chmod) + case evt.Op&fsnotify.Chmod == fsnotify.Chmod: + fallthrough + case evt.Op&fsnotify.Write == fsnotify.Write: fi, err := os.Stat(fw.Filename) if err != nil { if os.IsNotExist(err) { + RemoveWatch(fw.Filename) changes.NotifyDeleted() return } diff --git a/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go b/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go index 03be4275ca20..739b3c2abf8e 100644 --- a/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go +++ b/vendor/github.com/hpcloud/tail/watch/inotify_tracker.go @@ -12,7 +12,7 @@ import ( "github.com/hpcloud/tail/util" - "gopkg.in/fsnotify.v1" + "gopkg.in/fsnotify/fsnotify.v1" ) type InotifyTracker struct { @@ -83,21 +83,21 @@ func watch(winfo *watchInfo) error { } // RemoveWatch signals the run goroutine to remove the watch for the input filename -func RemoveWatch(fname string) { - remove(&watchInfo{ +func RemoveWatch(fname string) error { + return remove(&watchInfo{ fname: fname, }) } // RemoveWatch create signals the run goroutine to remove the watch for the input filename -func RemoveWatchCreate(fname string) { - remove(&watchInfo{ +func RemoveWatchCreate(fname string) error { + return remove(&watchInfo{ op: fsnotify.Create, fname: fname, }) } -func remove(winfo *watchInfo) { +func remove(winfo *watchInfo) error { // start running the shared InotifyTracker if not already running once.Do(goRun) @@ -108,27 +108,10 @@ func remove(winfo *watchInfo) { delete(shared.done, winfo.fname) close(done) } - - fname := winfo.fname - if winfo.isCreate() { - // Watch for new files to be created in the parent directory. - fname = filepath.Dir(fname) - } - shared.watchNums[fname]-- - watchNum := shared.watchNums[fname] - if watchNum == 0 { - delete(shared.watchNums, fname) - } shared.mux.Unlock() - // If we were the last ones to watch this file, unsubscribe from inotify. - // This needs to happen after releasing the lock because fsnotify waits - // synchronously for the kernel to acknowledge the removal of the watch - // for this file, which causes us to deadlock if we still held the lock. - if watchNum == 0 { - shared.watcher.Remove(fname) - } shared.remove <- winfo + return <-shared.error } // Events returns a channel to which FileEvents corresponding to the input filename @@ -142,8 +125,8 @@ func Events(fname string) <-chan fsnotify.Event { } // Cleanup removes the watch for the input filename if necessary. -func Cleanup(fname string) { - RemoveWatch(fname) +func Cleanup(fname string) error { + return RemoveWatch(fname) } // watchFlags calls fsnotify.WatchFlags for the input filename and flags, creating @@ -154,6 +137,8 @@ func (shared *InotifyTracker) addWatch(winfo *watchInfo) error { if shared.chans[winfo.fname] == nil { shared.chans[winfo.fname] = make(chan fsnotify.Event) + } + if shared.done[winfo.fname] == nil { shared.done[winfo.fname] = make(chan bool) } @@ -163,47 +148,50 @@ func (shared *InotifyTracker) addWatch(winfo *watchInfo) error { fname = filepath.Dir(fname) } + var err error // already in inotify watch - if shared.watchNums[fname] > 0 { - shared.watchNums[fname]++ - if winfo.isCreate() { - shared.watchNums[winfo.fname]++ - } - return nil + if shared.watchNums[fname] == 0 { + err = shared.watcher.Add(fname) } - - err := shared.watcher.Add(fname) if err == nil { shared.watchNums[fname]++ - if winfo.isCreate() { - shared.watchNums[winfo.fname]++ - } } return err } // removeWatch calls fsnotify.RemoveWatch for the input filename and closes the // corresponding events channel. -func (shared *InotifyTracker) removeWatch(winfo *watchInfo) { +func (shared *InotifyTracker) removeWatch(winfo *watchInfo) error { shared.mux.Lock() - defer shared.mux.Unlock() ch := shared.chans[winfo.fname] - if ch == nil { - return + if ch != nil { + delete(shared.chans, winfo.fname) + close(ch) } - delete(shared.chans, winfo.fname) - close(ch) - - if !winfo.isCreate() { - return + fname := winfo.fname + if winfo.isCreate() { + // Watch for new files to be created in the parent directory. + fname = filepath.Dir(fname) } + shared.watchNums[fname]-- + watchNum := shared.watchNums[fname] + if watchNum == 0 { + delete(shared.watchNums, fname) + } + shared.mux.Unlock() - shared.watchNums[winfo.fname]-- - if shared.watchNums[winfo.fname] == 0 { - delete(shared.watchNums, winfo.fname) + var err error + // If we were the last ones to watch this file, unsubscribe from inotify. + // This needs to happen after releasing the lock because fsnotify waits + // synchronously for the kernel to acknowledge the removal of the watch + // for this file, which causes us to deadlock if we still held the lock. + if watchNum == 0 { + err = shared.watcher.Remove(fname) } + + return err } // sendEvent sends the input event to the appropriate Tail. @@ -238,7 +226,7 @@ func (shared *InotifyTracker) run() { shared.error <- shared.addWatch(winfo) case winfo := <-shared.remove: - shared.removeWatch(winfo) + shared.error <- shared.removeWatch(winfo) case event, open := <-shared.watcher.Events: if !open {