From c640c5061abae6fd6457b51844cc8220dcf1d996 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 2 Nov 2023 09:16:51 -0600 Subject: [PATCH] [chore][pkg/stanza] Adjust length of knownFiles based on number of matches (#28646) Follows https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28493 This adjusts the length of `knownFiles` to be roughly 4x the number of matches per poll cycle. In other words, we will remember files for up to 4 poll cycles. Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/28567 --- pkg/stanza/fileconsumer/config.go | 2 +- pkg/stanza/fileconsumer/file.go | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 7e7aed2f8e52..41cbfa4e7451 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2), - knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles), + knownFiles: []*reader.Metadata{}, }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 6cb08e7cea55..e2707efc365b 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -34,12 +34,26 @@ type Manager struct { previousPollFiles []*reader.Reader knownFiles []*reader.Metadata + + // This value approximates the expected number of files which we will find in a single poll cycle. + // It is updated each poll cycle using a simple moving average calculation which assigns 20% weight + // to the most recent poll cycle. + // It is used to regulate the size of knownFiles. The goal is to allow knownFiles + // to contain checkpoints from a few previous poll cycles, but not grow unbounded. + movingAverageMatches int } func (m *Manager) Start(persister operator.Persister) error { ctx, cancel := context.WithCancel(context.Background()) m.cancel = cancel + if matches, err := m.fileMatcher.MatchFiles(); err != nil { + m.Warnf("finding files: %v", err) + } else { + m.movingAverageMatches = len(matches) + m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches)) + } + if persister != nil { m.persister = persister offsets, err := checkpoint.Load(ctx, m.persister) @@ -53,10 +67,6 @@ func (m *Manager) Start(persister operator.Persister) error { } } - if _, err := m.fileMatcher.MatchFiles(); err != nil { - m.Warnf("finding files: %v", err) - } - // Start polling goroutine m.startPoller(ctx) @@ -64,8 +74,8 @@ func (m *Manager) Start(persister operator.Persister) error { } func (m *Manager) closePreviousFiles() { - if forgetNum := len(m.previousPollFiles) + len(m.knownFiles) - cap(m.knownFiles); forgetNum > 0 { - m.knownFiles = m.knownFiles[forgetNum:] + if len(m.knownFiles) > 4*m.movingAverageMatches { + m.knownFiles = m.knownFiles[m.movingAverageMatches:] } for _, r := range m.previousPollFiles { m.knownFiles = append(m.knownFiles, r.Close()) @@ -116,6 +126,8 @@ func (m *Manager) poll(ctx context.Context) { matches, err := m.fileMatcher.MatchFiles() if err != nil { m.Debugf("finding files: %v", err) + } else { + m.movingAverageMatches = (m.movingAverageMatches*3 + len(matches)) / 4 } m.Debugf("matched files", zap.Strings("paths", matches))