Skip to content

Commit

Permalink
chore: improve logic
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Nov 1, 2024
1 parent 2936928 commit b5c571f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 35 deletions.
1 change: 0 additions & 1 deletion pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func (m *Manager) makeReaders(ctx context.Context, paths []string) {

m.tracker.Add(r)
}
m.tracker.SyncOffsets()
}

func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
if err != nil {
return nil, err
}
r.new = true // indicates that a reader is new (no previously known offset)
return r, nil
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type Reader struct {
includeFileRecordNum bool
compression string
acquireFSLock bool
new bool // indicates that a reader is new (no previously known offset)
}

// ReadToEnd will read until the end of the file
Expand Down Expand Up @@ -240,9 +239,6 @@ func (r *Reader) GetFileName() string {
return r.fileName
}

func (r *Reader) IsNew() bool {
return r.new
}
func (m Metadata) GetFingerprint() *fingerprint.Fingerprint {
return m.Fingerprint
}
Expand All @@ -264,5 +260,4 @@ func (r *Reader) updateFingerprint() {

func (r *Reader) SyncMetadata(m *Metadata) {
r.Metadata = m
r.new = false
}
63 changes: 35 additions & 28 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"context"
"fmt"
"os"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
Expand All @@ -17,6 +18,12 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

type Record struct {
File *os.File
Fingerprint *fingerprint.Fingerprint
Metadata *reader.Metadata
}

// Interface for tracking files that are being consumed.
type Tracker interface {
Add(reader *reader.Reader)
Expand All @@ -31,7 +38,7 @@ type Tracker interface {
EndPoll()
EndConsume() int
TotalReaders() int
SyncOffsets()
FindFiles(records []*Record)
}

// fileTracker tracks known offsets for files that are being consumed by the manager.
Expand Down Expand Up @@ -165,14 +172,14 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
if t.pollsToArchive <= 0 || t.persister == nil {
return
}
if err := t.updateArchive(t.archiveIndex, metadata); err != nil {
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) {
// readArchive loads data from the archive for a given index and returns a fileset.Filset.
key := fmt.Sprintf("knownFiles%d", index)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key)
if err != nil {
Expand All @@ -183,45 +190,45 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata]
return f, nil
}

func (t *fileTracker) updateArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
// updateArchive saves data to the archive for a given index and returns an error, if encountered.
// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
}

func (t *fileTracker) SyncOffsets() {
// SyncOffsets goes through all new (unmatched) readers and updates the metadata, if found on archive.
func (t *fileTracker) FindFiles(records []*Record) {
// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints agains that loaded set.

Check failure on line 200 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, pkg)

"agains" is a misspelling of "against"

Check failure on line 200 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, pkg)

"agains" is a misspelling of "against"

Check failure on line 200 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, pkg)

"agains" is a misspelling of "against"

Check failure on line 200 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, pkg)

"agains" is a misspelling of "against"

// To minimize disk access, we first access the index, then review unmatched readers and synchronize their metadata if a match is found.
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.
// We exit if no new reader exists.

archiveReadIndex := t.archiveIndex - 1 // try loading most recently written index and iterate backwards
for i := 0; i < t.pollsToArchive; i++ {
newFound := false
data, err := t.readArchive(archiveReadIndex)
mostRecentIndex := t.archiveIndex - 1
foundRecords := 0

// continue executing the loop until either all records are matched or all archive sets have been processed.
for i := 0; i < t.pollsToArchive && foundRecords < len(records); i++ {
modified := false
data, err := t.readArchive(mostRecentIndex)
if err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
for _, v := range t.currentPollFiles.Get() {
if v.IsNew() {
newFound = true
if md := data.Match(v.GetFingerprint(), fileset.StartsWith); md != nil {
v.SyncMetadata(md)
}
for _, record := range records {
if md := data.Match(record.Fingerprint, fileset.StartsWith); md != nil && record.Metadata != nil {
// update a record's metadata with the matched metadata.
modified = true
record.Metadata = md
foundRecords++
}
}
if !newFound {
// No new reader is available, so there’s no need to go through the rest of the archive.
// Just exit to save time.
break
}
if err := t.updateArchive(archiveReadIndex, data); err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
if modified {
if err := t.writeArchive(mostRecentIndex, data); err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
}

archiveReadIndex = (archiveReadIndex - 1) % t.pollsToArchive
mostRecentIndex = (mostRecentIndex - 1) % t.pollsToArchive
}

}
Expand Down Expand Up @@ -281,4 +288,4 @@ func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) SyncOffsets() {}
func (t *noStateTracker) FindFiles([]*Record) {}

0 comments on commit b5c571f

Please sign in to comment.