Skip to content

Commit

Permalink
[chore][pkg/stanza/fileconsumer] Add utility functions (#23415)
Browse files Browse the repository at this point in the history
**Description:** Convert some code blocks into utility functions and
improve readability.
  • Loading branch information
VihasMakwana committed Jun 21, 2023
1 parent 739e583 commit 7f417ab
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
m.clearCurrentFingerprints()
}

// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *Reader {
// Open the files first to minimize the time between listing and opening
func (m *Manager) makeFingerprint(path string) (*Fingerprint, *os.File) {
if _, ok := m.seenPaths[path]; !ok {
if m.readerFactory.fromBeginning {
m.Infow("Started watching file", "path", path)
Expand All @@ -198,34 +194,54 @@ func (m *Manager) makeReader(path string) *Reader {
file, err := os.Open(path) // #nosec - operator must read in files defined by user
if err != nil {
m.Debugf("Failed to open file", zap.Error(err))
return nil
return nil, nil
}

fp, err := m.readerFactory.newFingerprint(file)
if err != nil {
m.Errorw("Failed creating fingerprint", zap.Error(err))
return nil
if err = file.Close(); err != nil {
m.Errorf("problem closing file %s", file.Name())
}
return nil, nil
}

if len(fp.FirstBytes) == 0 {
// Empty file, don't read it until we can compare its fingerprint
if err = file.Close(); err != nil {
m.Errorf("problem closing file %s", file.Name())
}
return nil
return nil, nil
}
return fp, file
}

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
func (m *Manager) checkDuplicates(fp *Fingerprint) bool {
for i := 0; i < len(m.currentFps); i++ {
fp2 := m.currentFps[i]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude duplicates
if err = file.Close(); err != nil {
m.Errorf("problem closing file", "file", file.Name())
}
return nil
return true
}
}
return false
}

// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *Reader {
// Open the files first to minimize the time between listing and opening
fp, file := m.makeFingerprint(path)
if fp == nil {
return nil
}

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
if m.checkDuplicates(fp) {
if err := file.Close(); err != nil {
m.Errorf("problem closing file", "file", file.Name())
}
return nil
}

m.currentFps = append(m.currentFps, fp)
reader, err := m.newReader(file, fp)
Expand Down

0 comments on commit 7f417ab

Please sign in to comment.