-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[chore][fileconsumer/archive] - Add archive read logic #35798
base: main
Are you sure you want to change the base?
[chore][fileconsumer/archive] - Add archive read logic #35798
Conversation
@djaglowski were you able to look at this? |
archiveReadIndex := t.archiveIndex - 1 // try loading most recently written index and iterate backwards | ||
for i := 0; i < t.pollsToArchive; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment doesn't seem to describe what's happening here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update it.
} | ||
|
||
func (t *fileTracker) SyncOffsets() { | ||
// SyncOffsets goes through all new (unmatched) readers and updates the metadata, if found on archive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wasn't my understanding of how the archive would work. We shouldn't have to reconcile multiple copies of the same metadata and worry about syncing. It's too complicated.
We should use almost the exact same patterns as when searching knownFiles
. The only difference is that in order to reduce inefficient read/writes to storage, we should search 1 fileset for N fingerprints before moving on to the next fileset. (As opposed to searching all filesets for 1 fingerprint before moving on to the next fingerprint.)
Searching the archive should look something like:
func (t *fileTracker) FindFiles(fps []fingerprint.Fingerprint) []reader.Metadata {
matchedFPs := make([]reader.Metadata)
for i := mostRecentIndex; i != mostRecentIndex; i = (i + 1) % pollsToArchive {
fs := loadFileset(i) // the entire fileset at most once poll
var modified bool
for _, fp := range fps {
if matchedFP := fs.Get(fp) { // removes fp if matched
matchedFPs = append(matchedFPs, matchedFP)
modified = true
}
}
if modified {
saveFileset(i, fs) // overwrite the entire fileset at most once poll
}
return matchedFPs
}
This way filesets in the archive are kept up to date in near-real time. The only time when they could be out of data is before the function returns. It also ensures we are minimizing interactions with the archive. (One thing I didn't include in my pseudocode is early exit if all fingerprints have been found, but we should do that too.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an high level, this is what I did. But syncing/reconcile made it complex. I agree with you.
I agree on this. It's like an overkill.
Again, I agree. This is precisely what I did for opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker/tracker.go Lines 192 to 227 in 2936928
@djaglowski I'd like your thoughts on an approach. Note:
For archiving, our main focus is for to be efficient while accessing disk storage. Current implementation looks as following, correct me if I'm wrong: It becomes very difficult to integrate archiving the way we want (i.e. we should search 1 fileset for N fingerprints before moving on to the next fileset). Going through paths and creating readers one at a time adds to the difficulty. I propose a new way to create readers, which takes care of our requirements. I think what we need is to combine
In other words, we need to divide our function as per checkpoints:
As of now, only one loop exists and it becomes very difficult to integrate archiving in an efficient manner. Hence, I believe we need to decouple things. Please let me know your thoughts on this. |
@djaglowski Here’s a pseudocode outline for the new approach. I've omitted the archiving section for simplicity, but we can easily incorporate it into type Record struct {
file *os.File
fingerprint *fingerprint.Fingerprint
metadata *reader.Metadata // metadata is non-nil if a file is found in knownFiles.
}
func makeReaders(paths []string) {
records := make([]*Record, 0, len(paths))
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}
records = append(records, &Record{file: file, fingerprint: fp})
}
findFiles(records) // update records with matched metadata, in-place
// create new readers once matching is done
for _, record := range records {
// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(record.Fingerprint); r != nil {
m.tracker.Add(r)
record.file.Close()
continue
}
r, err := m.newReader(ctx, record)
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
}
m.tracker.Add(r)
}
}
// findFiles loops through the records, matches them against the offsets in memory and updates record.metadata with found metadata
func findFiles(records []*Record) {
for _, record := range records {
// update record.Metadata if match is found
if oldReader := t.GetOpenFile(record.fingerprint); oldReader != nil {
record.metadata = oldReader.Close()
} else if oldMetadata := t.GetClosedFile(record.fingerprint); oldMetadata != nil {
record.metadata = oldMetadata
}
}
}
func newReader(record *tracker.Record) {
if record.metadata != nil {
return m.readerFactory.NewReaderFromMetadata(record.file, record.metadata)
} else {
// If we don't match any previously known files, create a new reader from scratch
m.set.Logger.Info("Started watching file", zap.String("path", record.file.Name()))
return m.readerFactory.NewReader(record.file, record.fingerprint)
}
} |
I don't see it. It's fundamentally doing something different than what I described.
What I have suggested is that there should be no need for syncing files. We only need:
|
@djaglowski This raises a concern: we need a way to link the fingerprint or metadata to a specific
If we only return an array of metadata, we won't have the corresponding os.File instance needed to create a reader. What are your thoughts on this? Or am i missing something here. Please correct me if I'm going offtrack. |
@djaglowski I’ve removed the unnecessary state from the readers and reverted to the original pseudocode. Please take a look! I’d also like to discuss future PRs to ensure we’re aligned.
func makeReaders(paths []string) {
unmatchedFiles := make([]*Record, 0)
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}
// ...Exclude duplicate paths
if (fp found in tracker) {
// call NewReaderFromMetadata
} else {
unmatchedFiles = append(unmatchedFiles, Record{File: file, Fingerprint: fp})
}
}
// Update unmatchedFiles in place by searching for any matches in the archive via the tracker
// This method will populate the Metadata field in the Record if a match is found
tracker.FindFiles(unmatchedFiles) // this method is introduced in current PR
// Now, process the unmatchedFiles to determine if they have been matched in the archive
for _, record := range unmatchedFiles {
// Check if Metadata has been populated, indicating a successful match
if record.Metadata != nil {
r, err := NewReaderFromMetadata(record.File, record.Metadata)
tracker.Add(r)
} else {
r, err := NewReader(record.File, record.Fingerprint)
tracker.Add(r)
}
}
} |
b5c571f
to
fbf35c5
Compare
38cd221
to
beb94c5
Compare
beb94c5
to
5560f6a
Compare
@djaglowski were you able to review take a look at this? Let me know your thoughts! |
That's a good point. I think we can handle that a couple different ways. One would be to just return a slice of the same size, where each index of the result slice corresponds to the fingerprint of the same index in the input slice. |
Let's not get into future PRs in this PR. I don't see a need for the |
I'll explore few different ways and update the PR. |
431dccc
to
ad46f94
Compare
@djaglowski can you take a fresh look? I've removed the |
|
||
func Mod(x, y int) int { | ||
return ((x % y) + y) % y | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed because in golang, the %
operator acts as a remainder, not a typical math modulus
and it can output negative integers.
func (f *Fingerprint) GetFingerprint() *Fingerprint { | ||
return f | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this so the Fingeprint
can implement the Matchable
interface, so that we can have unified return type for
opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker/tracker.go
Lines 194 to 195 in ad46f94
func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []fileset.Matchable { | |
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found. |
I think this is neater than returning []any
or creating a struct to capture the fingerprint/metadata.
"go.opentelemetry.io/collector/component/componenttest" | ||
) | ||
|
||
func TestFindFilesOrder(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know your thoughts over this test @djaglowski.
This PR follows #35098.
Description
Future PRs