Skip to content

Commit 2195675

Browse files
authored
Shard ingest flush queue by username and metric name, to match the underlying sharding of the index. (cortexproject#271)
1 parent fcbaa93 commit 2195675

File tree

3 files changed

+41
-19
lines changed

3 files changed

+41
-19
lines changed

chunk/chunk_store.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (c *AWSStore) updateIndex(ctx context.Context, userID string, chunks []Chun
190190
func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[string][]*dynamodb.WriteRequest, error) {
191191
writeReqs := map[string][]*dynamodb.WriteRequest{}
192192
for _, chunk := range chunks {
193-
metricName, err := extractMetricNameFromMetric(chunk.Metric)
193+
metricName, err := util.ExtractMetricNameFromMetric(chunk.Metric)
194194
if err != nil {
195195
return nil, err
196196
}
@@ -474,15 +474,6 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [
474474
return chunks, nil
475475
}
476476

477-
func extractMetricNameFromMetric(m model.Metric) (model.LabelValue, error) {
478-
for name, value := range m {
479-
if name == model.MetricNameLabel {
480-
return value, nil
481-
}
482-
}
483-
return "", fmt.Errorf("no MetricNameLabel for chunk")
484-
}
485-
486477
func extractMetricNameFromMatchers(matchers []*metric.LabelMatcher) (model.LabelValue, []*metric.LabelMatcher, error) {
487478
for i, matcher := range matchers {
488479
if matcher.Name != model.MetricNameLabel {

ingester/ingester.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ingester
33
import (
44
"flag"
55
"fmt"
6+
"hash/fnv"
67
"net/http"
78
"sync"
89
"time"
@@ -439,7 +440,9 @@ func (i *Ingester) sweepUsers(immediate bool) {
439440
for id, state := range i.userStates.cp() {
440441
for pair := range state.fpToSeries.iter() {
441442
state.fpLocker.Lock(pair.fp)
442-
i.sweepSeries(id, pair.fp, pair.series, immediate)
443+
if err := i.sweepSeries(id, pair.fp, pair.series, immediate); err != nil {
444+
log.Errorf("Error sweeping series: %v", err)
445+
}
443446
state.fpLocker.Unlock(pair.fp)
444447
}
445448
}
@@ -449,18 +452,33 @@ func (i *Ingester) sweepUsers(immediate bool) {
449452
//
450453
// NB we don't close the head chunk here, as the series could wait in the queue
451454
// for some time, and we want to encourage chunks to be as full as possible.
452-
func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) {
453-
if len(series.chunkDescs) <= 0 {
454-
return
455+
func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) error {
456+
flush := i.shouldFlushSeries(series, immediate)
457+
if !flush {
458+
return nil
455459
}
456460

457-
firstTime := series.firstTime()
458-
flush := i.shouldFlushSeries(series, immediate)
461+
return i.enqueueSeriesForFlushing(userID, fp, series, immediate)
462+
}
459463

460-
if flush {
461-
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
462-
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
464+
func (i *Ingester) enqueueSeriesForFlushing(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) error {
465+
metricName, err := util.ExtractMetricNameFromMetric(series.metric)
466+
if err != nil {
467+
return err
463468
}
469+
470+
h := fnv.New32()
471+
if _, err := h.Write([]byte(userID)); err != nil {
472+
return err
473+
}
474+
if _, err := h.Write([]byte(metricName)); err != nil {
475+
return err
476+
}
477+
478+
flushQueueIndex := int(h.Sum32() % uint32(i.cfg.ConcurrentFlushes))
479+
firstTime := series.firstTime()
480+
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
481+
return nil
464482
}
465483

466484
func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) bool {

util/matchers.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package util
22

33
import (
4+
"fmt"
5+
6+
"github.com/prometheus/common/model"
47
"github.com/prometheus/prometheus/storage/metric"
58
)
69

@@ -15,3 +18,13 @@ func SplitFiltersAndMatchers(allMatchers []*metric.LabelMatcher) (filters, match
1518
}
1619
return
1720
}
21+
22+
// ExtractMetricNameFromMetric extract the metric name from a model.Metric
23+
func ExtractMetricNameFromMetric(m model.Metric) (model.LabelValue, error) {
24+
for name, value := range m {
25+
if name == model.MetricNameLabel {
26+
return value, nil
27+
}
28+
}
29+
return "", fmt.Errorf("no MetricNameLabel for chunk")
30+
}

0 commit comments

Comments
 (0)