Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][fileconsumer/archive] - Add archive read logic #35798

Open
wants to merge 47 commits into
base: main
Choose a base branch
from

Conversation

VihasMakwana
Copy link
Contributor

This PR follows #35098.

Description

  • This PR adds core logic for matching from archive. Check this out for the core logic.

Future PRs

  • As of now, we don't keep track of most recently written index across collector restarts. This is simple to accomplish and we can use of persister for this. I haven't implemented this in current PR, as I want to guide your focus solely towards reading part. We can address this in this PR (later) or in a separate PR, independently.
  • Testing and Enabling: Once we establish common ground on reading from archive matter, we can proceed with testing and enabling the configuration.

@VihasMakwana
Copy link
Contributor Author

@djaglowski were you able to look at this?

pkg/stanza/fileconsumer/internal/reader/factory.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
Comment on lines 198 to 199
archiveReadIndex := t.archiveIndex - 1 // try loading most recently written index and iterate backwards
for i := 0; i < t.pollsToArchive; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't seem to describe what's happening here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it.

}

func (t *fileTracker) SyncOffsets() {
// SyncOffsets goes through all new (unmatched) readers and updates the metadata, if found on archive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't my understanding of how the archive would work. We shouldn't have to reconcile multiple copies of the same metadata and worry about syncing. It's too complicated.

We should use almost the exact same patterns as when searching knownFiles. The only difference is that in order to reduce inefficient read/writes to storage, we should search 1 fileset for N fingerprints before moving on to the next fileset. (As opposed to searching all filesets for 1 fingerprint before moving on to the next fingerprint.)

Searching the archive should look something like:

func (t *fileTracker) FindFiles(fps []fingerprint.Fingerprint) []reader.Metadata {
    matchedFPs := make([]reader.Metadata)
    for i := mostRecentIndex; i != mostRecentIndex; i = (i + 1) % pollsToArchive {
        fs := loadFileset(i)  // the entire fileset at most once poll
        var modified bool
        for _, fp := range fps {
            if matchedFP := fs.Get(fp) { // removes fp if matched
               matchedFPs = append(matchedFPs, matchedFP)
               modified = true
            }
        }
        if modified {
            saveFileset(i, fs) // overwrite the entire fileset at most once poll
        }            
    return matchedFPs
}

This way filesets in the archive are kept up to date in near-real time. The only time when they could be out of data is before the function returns. It also ensures we are minimizing interactions with the archive. (One thing I didn't include in my pseudocode is early exit if all fingerprints have been found, but we should do that too.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From an high level, this is what I did. But syncing/reconcile made it complex. I agree with you.

@VihasMakwana
Copy link
Contributor Author

VihasMakwana commented Oct 29, 2024

This wasn't my understanding of how the archive would work. We shouldn't have to reconcile multiple copies of the same metadata and worry about syncing. It's too complicated.

I agree on this. It's like an overkill.

We should use almost the exact same patterns as when searching knownFiles. The only difference is that in order to reduce inefficient read/writes to storage, we should search 1 fileset for N fingerprints before moving on to the next fileset. (As opposed to searching all filesets for 1 fingerprint before moving on to the next fingerprint.)

Again, I agree. This is precisely what I did for SyncOffsets()

func (t *fileTracker) SyncOffsets() {
// SyncOffsets goes through all new (unmatched) readers and updates the metadata, if found on archive.
// To minimize disk access, we first access the index, then review unmatched readers and synchronize their metadata if a match is found.
// We exit if no new reader exists.
archiveReadIndex := t.archiveIndex - 1 // try loading most recently written index and iterate backwards
for i := 0; i < t.pollsToArchive; i++ {
newFound := false
data, err := t.readArchive(archiveReadIndex)
if err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
for _, v := range t.currentPollFiles.Get() {
if v.IsNew() {
newFound = true
if md := data.Match(v.GetFingerprint(), fileset.StartsWith); md != nil {
v.SyncMetadata(md)
}
}
}
if !newFound {
// No new reader is available, so there’s no need to go through the rest of the archive.
// Just exit to save time.
break
}
if err := t.updateArchive(archiveReadIndex, data); err != nil {
t.set.Logger.Error("error while opening archive", zap.Error(err))
continue
}
archiveReadIndex = (archiveReadIndex - 1) % t.pollsToArchive
}
}
, although the comments were incorrect. I'll take care next time.


@djaglowski I'd like your thoughts on an approach.

Note:

  • We need both the file and the fingerprint to create a new reader. To link them together, we can create a struct that includes references to both the file and the fingerprint. For now, let’s call it a record.

For archiving, our main focus is for to be efficient while accessing disk storage.

Current implementation looks as following, correct me if I'm wrong:
Screenshot 2024-10-29 at 5 12 48 AM

It becomes very difficult to integrate archiving the way we want (i.e. we should search 1 fileset for N fingerprints before moving on to the next fileset). Going through paths and creating readers one at a time adds to the difficulty.

I propose a new way to create readers, which takes care of our requirements.

I think what we need is to combine records (the struct containing the file and fingerprint):

  1. Next, we'll check each record for a match in memory.
  2. Then, we’ll go through each archive to find matches for the new records.
  3. Finally, we'll have a loop that creates readers based on the matched metadata and adds them to tracker.

In other words, we need to divide our function as per checkpoints:

  1. 1st loop to combine files and fingerprints into an array.
  2. 2nd loop to go through the combined array and try finding a match in memory.
  3. Reading from archive.
  4. Finally, create readers from records.

As of now, only one loop exists and it becomes very difficult to integrate archiving in an efficient manner. Hence, I believe we need to decouple things.

Please let me know your thoughts on this.

@VihasMakwana
Copy link
Contributor Author

VihasMakwana commented Oct 29, 2024

@djaglowski Here’s a pseudocode outline for the new approach. I've omitted the archiving section for simplicity, but we can easily incorporate it into FindFiles.

type Record struct {
	file *os.File
	fingerprint *fingerprint.Fingerprint
	metadata *reader.Metadata // metadata is non-nil if a file is found in knownFiles. 
}

func makeReaders(paths []string) {
	records := make([]*Record, 0, len(paths))
	for _, path := range paths {
		fp, file := m.makeFingerprint(path)
		if fp == nil {
			continue
		}
		records = append(records, &Record{file: file, fingerprint: fp})
	}

	findFiles(records) // update records with matched metadata, in-place

	// create new readers once matching is done
	for _, record := range records {
		// Exclude duplicate paths with the same content. This can happen when files are
		// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
		if r := m.tracker.GetCurrentFile(record.Fingerprint); r != nil {
			m.tracker.Add(r)
			record.file.Close()
			continue
		}
		r, err := m.newReader(ctx, record)
		if err != nil {
			m.set.Logger.Error("Failed to create reader", zap.Error(err))
			continue
		}

		m.tracker.Add(r)

	}

}

// findFiles loops through the records, matches them against the offsets in memory and updates record.metadata with found metadata
func findFiles(records []*Record) {
	for _, record := range records {

        // update record.Metadata if match is found
		if oldReader := t.GetOpenFile(record.fingerprint); oldReader != nil {
			record.metadata = oldReader.Close()
		} else if oldMetadata := t.GetClosedFile(record.fingerprint); oldMetadata != nil {
			record.metadata = oldMetadata
		}
	}
}


func newReader(record *tracker.Record) {

	if record.metadata != nil {
		return m.readerFactory.NewReaderFromMetadata(record.file, record.metadata)
	} else {
		// If we don't match any previously known files, create a new reader from scratch
		m.set.Logger.Info("Started watching file", zap.String("path", record.file.Name()))
		return m.readerFactory.NewReader(record.file, record.fingerprint)
	}
}

@djaglowski
Copy link
Member

This is precisely what I did for SyncOffsets()

I don't see it. It's fundamentally doing something different than what I described. SyncOffsets is

  1. iterating through all sets in the archive
  2. operating on individual files
  3. dependent on unnecessary state that you've added to readers

What I have suggested is that there should be no need for syncing files. We only need:

  1. Load a set from the archive. Search it for matches. Remove matches from the set. Return the matches. If any matches were found, rewrite the set to the archive.
  2. At the end of a poll, instead of deleting the oldest set in knownFiles, write it to the archive.

@VihasMakwana
Copy link
Contributor Author

VihasMakwana commented Nov 1, 2024

What I have suggested is that there should be no need for syncing files. We only need:

  1. Load a set from the archive. Search it for matches. Remove matches from the set. Return the matches. If any matches were found, rewrite the set to the archive.
  2. At the end of a poll, instead of deleting the oldest set in knownFiles, write it to the archive.

@djaglowski
I see your point.
In your example, the FindFiles function is defined as FindFiles(fps []fingerprint.Fingerprint) []readerMetadata. However, for creating a reader, we have two function signatures:
NewReaderFromMetadata(file *os.File, m *Metadata) and NewReader(file *os.File, fp *Fingerprint).

This raises a concern: we need a way to link the fingerprint or metadata to a specific os.File.
Essentially, we need to indicate that "metadata x belongs to file y," so we can then call NewReaderFromMetadata(x, y).

metadata x belongs to file y so we can then call NewReaderFromMetadata(x, y)

If we only return an array of metadata, we won't have the corresponding os.File instance needed to create a reader. What are your thoughts on this? Or am i missing something here. Please correct me if I'm going offtrack.

@VihasMakwana
Copy link
Contributor Author

VihasMakwana commented Nov 1, 2024

@djaglowski
First, thank you for your patience. I’ve made updates to the PR based on your feedback.

I’ve removed the unnecessary state from the readers and reverted to the original pseudocode. Please take a look!


I’d also like to discuss future PRs to ensure we’re aligned.
If you review this PR again, you’ll notice the introduction of a new convenience struct called Record.

  • Why do we need this?
    • It simplifies reader creation. You can see more details here.
  • How would future PR look like and how do we plan to use struct Record?
    • At a high level, they will follow a structure similar to the following pseudocode:
func makeReaders(paths []string) {
	unmatchedFiles := make([]*Record, 0)
	for _, path := range paths {
		fp, file := m.makeFingerprint(path)
		if fp == nil {
			continue
		}
		// ...Exclude duplicate paths
		if (fp found in tracker) {
			// call NewReaderFromMetadata
		} else {
			unmatchedFiles = append(unmatchedFiles, Record{File: file, Fingerprint: fp})
		}
	}
	
	// Update unmatchedFiles in place by searching for any matches in the archive via the tracker
	// This method will populate the Metadata field in the Record if a match is found
	tracker.FindFiles(unmatchedFiles) // this method is introduced in current PR
	
	// Now, process the unmatchedFiles to determine if they have been matched in the archive
	for _, record := range unmatchedFiles {
		// Check if Metadata has been populated, indicating a successful match
		if record.Metadata != nil {
			r, err := NewReaderFromMetadata(record.File, record.Metadata)
			tracker.Add(r)
		} else {
			r, err := NewReader(record.File, record.Fingerprint)
			tracker.Add(r)
		}
	}
}

@VihasMakwana
Copy link
Contributor Author

@djaglowski were you able to review take a look at this? Let me know your thoughts!

@djaglowski
Copy link
Member

@djaglowski I see your point. In your example, the FindFiles function is defined as FindFiles(fps []fingerprint.Fingerprint) []readerMetadata. However, for creating a reader, we have two function signatures: NewReaderFromMetadata(file *os.File, m *Metadata) and NewReader(file *os.File, fp *Fingerprint).

This raises a concern: we need a way to link the fingerprint or metadata to a specific os.File. Essentially, we need to indicate that "metadata x belongs to file y," so we can then call NewReaderFromMetadata(x, y).

That's a good point. I think we can handle that a couple different ways. One would be to just return a slice of the same size, where each index of the result slice corresponds to the fingerprint of the same index in the input slice.

@djaglowski
Copy link
Member

I’d also like to discuss future PRs to ensure we’re aligned. If you review this PR again, you’ll notice the introduction of a new convenience struct called Record.

Let's not get into future PRs in this PR. I don't see a need for the Record struct but I do see some pitfalls to using it. Let's leave it out until it is proven to be necessary.

@VihasMakwana
Copy link
Contributor Author

@djaglowski I see your point. In your example, the FindFiles function is defined as FindFiles(fps []fingerprint.Fingerprint) []readerMetadata. However, for creating a reader, we have two function signatures: NewReaderFromMetadata(file *os.File, m *Metadata) and NewReader(file *os.File, fp *Fingerprint).
This raises a concern: we need a way to link the fingerprint or metadata to a specific os.File. Essentially, we need to indicate that "metadata x belongs to file y," so we can then call NewReaderFromMetadata(x, y).

That's a good point. I think we can handle that a couple different ways. One would be to just return a slice of the same size, where each index of the result slice corresponds to the fingerprint of the same index in the input slice.

I'll explore few different ways and update the PR.

@VihasMakwana
Copy link
Contributor Author

@djaglowski can you take a fresh look? I've removed the Record and each index of the result slice corresponds to the fingerprint of the same index in the input slice. Let me know what do you think about it!


func Mod(x, y int) int {
return ((x % y) + y) % y
}
Copy link
Contributor Author

@VihasMakwana VihasMakwana Nov 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because in golang, the % operator acts as a remainder, not a typical math modulus and it can output negative integers.

func (f *Fingerprint) GetFingerprint() *Fingerprint {
return f
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this so the Fingeprint can implement the Matchable interface, so that we can have unified return type for

func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []fileset.Matchable {
// To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found.

I think this is neater than returning []any or creating a struct to capture the fingerprint/metadata.

"go.opentelemetry.io/collector/component/componenttest"
)

func TestFindFilesOrder(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know your thoughts over this test @djaglowski.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants