Skip to content

Commit

Permalink
chore: move evictOldStreams into separate method to use defer on mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Feb 27, 2025
1 parent ffc9763 commit 57856a4
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions pkg/limits/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (s *IngestLimits) starting(ctx context.Context) (err error) {
// the metadata map. The method also starts a goroutine to periodically evict old streams from the metadata map.
func (s *IngestLimits) running(ctx context.Context) error {
// Start the eviction goroutine
go s.evictOldStreams(ctx)
go s.evictOldStreamsPeriodic(ctx)

for {
select {
Expand Down Expand Up @@ -387,44 +387,47 @@ func (s *IngestLimits) running(ctx context.Context) error {
}
}

// evictOldStreams runs as a goroutine and periodically removes streams from the metadata map
// that haven't been seen within the configured window size. It runs every WindowSize/2 interval
// to ensure timely eviction of stale entries.
func (s *IngestLimits) evictOldStreams(ctx context.Context) {
// evictOldStreamsPeriodic runs a periodic job that removes streams that
// haven't been seen within the configured window size. It runs an interval
// half the window size.
func (s *IngestLimits) evictOldStreamsPeriodic(ctx context.Context) {
ticker := time.NewTicker(s.cfg.WindowSize / 2)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.mtx.Lock()
s.evictOldStreams(ctx)
}
}
}

cutoff := time.Now().Add(-s.cfg.WindowSize).UnixNano()
func (s *IngestLimits) evictOldStreams(ctx context.Context) {

Check warning on line 406 in pkg/limits/ingest_limits.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
s.mtx.Lock()
defer s.mtx.Unlock()

for tenant, partitions := range s.metadata {
evictedCount := 0
cutoff := time.Now().Add(-s.cfg.WindowSize).UnixNano()

for partitionID, streams := range partitions {
for i, stream := range streams {
if stream.lastSeenAt < cutoff {
s.metadata[tenant][partitionID] = append(s.metadata[tenant][partitionID][:i], s.metadata[tenant][partitionID][i+1:]...)
evictedCount++
}
}
}
for tenant, partitions := range s.metadata {
evictedCount := 0

// Clean up empty tenant maps and update gauges
if len(s.metadata[tenant]) == 0 {
delete(s.metadata, tenant)
}
// Only update recorded streams gauge if the number changed
if evictedCount > 0 {
s.metrics.tenantStreamEvictionsTotal.WithLabelValues(tenant).Add(float64(evictedCount))
for partitionID, streams := range partitions {
for i, stream := range streams {
if stream.lastSeenAt < cutoff {
s.metadata[tenant][partitionID] = append(s.metadata[tenant][partitionID][:i], s.metadata[tenant][partitionID][i+1:]...)
evictedCount++
}
}
s.mtx.Unlock()
}

// Clean up empty tenant maps and update gauges
if len(s.metadata[tenant]) == 0 {
delete(s.metadata, tenant)
}
// Only update recorded streams gauge if the number changed
if evictedCount > 0 {
s.metrics.tenantStreamEvictionsTotal.WithLabelValues(tenant).Add(float64(evictedCount))
}
}
}
Expand Down

0 comments on commit 57856a4

Please sign in to comment.