Skip to content

Commit aa15b6b

Browse files
committed
Revert the shard-flush-queue-by-metric-name change, it was causing ingester flusing to be too slow.
1 parent 3c4b012 commit aa15b6b

File tree

1 file changed

+9
-27
lines changed

1 file changed

+9
-27
lines changed

ingester/ingester.go

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package ingester
33
import (
44
"flag"
55
"fmt"
6-
"hash/fnv"
76
"net/http"
87
"sync"
98
"time"
@@ -466,9 +465,7 @@ func (i *Ingester) sweepUsers(immediate bool) {
466465
for id, state := range i.userStates.cp() {
467466
for pair := range state.fpToSeries.iter() {
468467
state.fpLocker.Lock(pair.fp)
469-
if err := i.sweepSeries(id, pair.fp, pair.series, immediate); err != nil {
470-
log.Errorf("Error sweeping series: %v", err)
471-
}
468+
i.sweepSeries(id, pair.fp, pair.series, immediate)
472469
state.fpLocker.Unlock(pair.fp)
473470
}
474471
}
@@ -478,33 +475,18 @@ func (i *Ingester) sweepUsers(immediate bool) {
478475
//
479476
// NB we don't close the head chunk here, as the series could wait in the queue
480477
// for some time, and we want to encourage chunks to be as full as possible.
481-
func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) error {
482-
flush := i.shouldFlushSeries(series, immediate)
483-
if !flush {
484-
return nil
478+
func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) {
479+
if len(series.chunkDescs) <= 0 {
480+
return
485481
}
486482

487-
return i.enqueueSeriesForFlushing(userID, fp, series, immediate)
488-
}
489-
490-
func (i *Ingester) enqueueSeriesForFlushing(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) error {
491-
metricName, err := util.ExtractMetricNameFromMetric(series.metric)
492-
if err != nil {
493-
return err
494-
}
483+
firstTime := series.firstTime()
484+
flush := i.shouldFlushSeries(series, immediate)
495485

496-
h := fnv.New32()
497-
if _, err := h.Write([]byte(userID)); err != nil {
498-
return err
499-
}
500-
if _, err := h.Write([]byte(metricName)); err != nil {
501-
return err
486+
if flush {
487+
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
488+
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
502489
}
503-
504-
flushQueueIndex := int(h.Sum32() % uint32(i.cfg.ConcurrentFlushes))
505-
firstTime := series.firstTime()
506-
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
507-
return nil
508490
}
509491

510492
func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) bool {

0 commit comments

Comments
 (0)