Skip to content

Push reduce one hash operation of Labels. #4945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* [ENHANCEMENT] Enhance traces with hostname information. #4898
* [ENHANCEMENT] Improve the documentation around limits. #4905
* [ENHANCEMENT] Distributor: cache user overrides to reduce lock contention. #4904
* [ENHANCEMENT] Push reduce one hash operation of Labels. #4945
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
* [FEATURE] Compactor: Added `-compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
Expand Down
31 changes: 3 additions & 28 deletions pkg/ingester/active_series.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
package ingester

import (
"hash"
"math"
"sync"
"time"

"github.com/cespare/xxhash"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util"
)

const (
Expand Down Expand Up @@ -53,30 +48,10 @@ func NewActiveSeries() *ActiveSeries {
}

// Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
func (c *ActiveSeries) UpdateSeries(series labels.Labels, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
fp := fingerprint(series)
stripeID := fp % numActiveSeriesStripes

c.stripes[stripeID].updateSeriesTimestamp(now, series, fp, labelsCopy)
}

var sep = []byte{model.SeparatorByte}

var hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}

func fingerprint(series labels.Labels) uint64 {
sum := hashPool.Get().(hash.Hash64)
defer hashPool.Put(sum)

sum.Reset()
for _, label := range series {
_, _ = sum.Write(util.YoloBuf(label.Name))
_, _ = sum.Write(sep)
_, _ = sum.Write(util.YoloBuf(label.Value))
_, _ = sum.Write(sep)
}
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
stripeID := hash % numActiveSeriesStripes

return sum.Sum64()
c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy)
}

// Purge removes expired entries from the cache. This function should be called
Expand Down
41 changes: 25 additions & 16 deletions pkg/ingester/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,33 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
)

func copyFn(l labels.Labels) labels.Labels { return l }

func fromLabelToLabels(ls []labels.Label) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&ls))
}

func TestActiveSeries_UpdateSeries(t *testing.T) {
ls1 := []labels.Label{{Name: "a", Value: "1"}}
ls2 := []labels.Label{{Name: "a", Value: "2"}}

c := NewActiveSeries()
assert.Equal(t, 0, c.Active())

c.UpdateSeries(ls1, time.Now(), copyFn)
labels1Hash := fromLabelToLabels(ls1).Hash()
labels2Hash := fromLabelToLabels(ls2).Hash()
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
assert.Equal(t, 1, c.Active())

c.UpdateSeries(ls1, time.Now(), copyFn)
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
assert.Equal(t, 1, c.Active())

c.UpdateSeries(ls2, time.Now(), copyFn)
c.UpdateSeries(ls2, labels2Hash, time.Now(), copyFn)
assert.Equal(t, 2, c.Active())
}

Expand All @@ -46,7 +52,7 @@ func TestActiveSeries_Purge(t *testing.T) {
c := NewActiveSeries()

for i := 0; i < len(series); i++ {
c.UpdateSeries(series[i], time.Unix(int64(i), 0), copyFn)
c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), copyFn)
}

c.Purge(time.Unix(int64(ttl+1), 0))
Expand All @@ -62,24 +68,23 @@ func TestActiveSeries_PurgeOpt(t *testing.T) {
metric := labels.NewBuilder(labels.FromStrings("__name__", "logs"))
ls1 := metric.Set("_", "ypfajYg2lsv").Labels(nil)
ls2 := metric.Set("_", "KiqbryhzUpn").Labels(nil)

c := NewActiveSeries()

now := time.Now()
c.UpdateSeries(ls1, now.Add(-2*time.Minute), copyFn)
c.UpdateSeries(ls2, now, copyFn)
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now, copyFn)
c.Purge(now)

assert.Equal(t, 1, c.Active())

c.UpdateSeries(ls1, now.Add(-1*time.Minute), copyFn)
c.UpdateSeries(ls2, now, copyFn)
c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now, copyFn)
c.Purge(now)

assert.Equal(t, 1, c.Active())

// This will *not* update the series, since there is already newer timestamp.
c.UpdateSeries(ls2, now.Add(-1*time.Minute), copyFn)
c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), copyFn)
c.Purge(now)

assert.Equal(t, 1, c.Active())
Expand All @@ -105,7 +110,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int)
wg := &sync.WaitGroup{}
start := make(chan struct{})
max := int(math.Ceil(float64(b.N) / float64(goroutines)))

labelhash := series.Hash()
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
Expand All @@ -116,7 +121,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int)

for ix := 0; ix < max; ix++ {
now = now.Add(time.Duration(ix) * time.Millisecond)
c.UpdateSeries(series, now, copyFn)
c.UpdateSeries(series, labelhash, now, copyFn)
}
}()
}
Expand All @@ -137,15 +142,17 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) {
name := nameBuf.String()

series := make([]labels.Labels, b.N)
labelhash := make([]uint64, b.N)
for s := 0; s < b.N; s++ {
series[s] = labels.Labels{{Name: name, Value: name + strconv.Itoa(s)}}
labelhash[s] = series[s].Hash()
}

now := time.Now().UnixNano()

b.ResetTimer()
for ix := 0; ix < b.N; ix++ {
c.UpdateSeries(series[ix], time.Unix(0, now+int64(ix)), copyFn)
c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), copyFn)
}
}

Expand All @@ -165,8 +172,10 @@ func benchmarkPurge(b *testing.B, twice bool) {
c := NewActiveSeries()

series := [numSeries]labels.Labels{}
labelhash := [numSeries]uint64{}
for s := 0; s < numSeries; s++ {
series[s] = labels.Labels{{Name: "a", Value: strconv.Itoa(s)}}
labelhash[s] = series[s].Hash()
}

for i := 0; i < b.N; i++ {
Expand All @@ -175,9 +184,9 @@ func benchmarkPurge(b *testing.B, twice bool) {
// Prepare series
for ix, s := range series {
if ix < numExpiresSeries {
c.UpdateSeries(s, now.Add(-time.Minute), copyFn)
c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), copyFn)
} else {
c.UpdateSeries(s, now, copyFn)
c.UpdateSeries(s, labelhash[ix], now, copyFn)
}
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// has sorted labels once hit the ingester).

// Look up a reference for this series.
ref, copiedLabels := app.GetRef(cortexpb.FromLabelAdaptersToLabels(ts.Labels))
tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels)
tsLabelsHash := tsLabels.Hash()
ref, copiedLabels := app.GetRef(tsLabels, tsLabelsHash)

// To find out if any sample was added to this series, we keep old value.
oldSucceededSamplesCount := succeededSamplesCount
Expand Down Expand Up @@ -1033,7 +1035,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
}

if i.cfg.ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
db.activeSeries.UpdateSeries(cortexpb.FromLabelAdaptersToLabels(ts.Labels), startAppend, func(l labels.Labels) labels.Labels {
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels {
// we must already have copied the labels if succeededSamplesCount has been incremented.
return copiedLabels
})
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/prometheus/prometheus/tsdb/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions vendor/github.com/prometheus/prometheus/tsdb/head_append.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.