Skip to content

Commit f0c3a7b

Browse files
committed
Opmize GetLabels and GetLabels values from store gateway
Signed-off-by: alanprot <alanprot@gmail.com>
1 parent a899c65 commit f0c3a7b

File tree

4 files changed

+65
-7
lines changed

4 files changed

+65
-7
lines changed

pkg/distributor/distributor.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ const (
6464
typeMetadata = "metadata"
6565

6666
instanceIngestionRateTickInterval = time.Second
67-
68-
// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
69-
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
70-
mergeSlicesParallelism = 8
7167
)
7268

7369
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -973,7 +969,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
973969
for i, resp := range resps {
974970
values[i] = resp.([]string)
975971
}
976-
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
972+
r := util.MergeSlicesParallel(util.DefaultMergeSlicesParallelism, values...)
977973
span.SetTag("result_length", len(r))
978974
return r, nil
979975
}
@@ -1043,7 +1039,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
10431039
for i, resp := range resps {
10441040
values[i] = resp.([]string)
10451041
}
1046-
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
1042+
r := util.MergeSlicesParallel(util.DefaultMergeSlicesParallelism, values...)
10471043
span.SetTag("result_length", len(r))
10481044

10491045
return r, nil

pkg/querier/blocks_store_queryable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func (q *blocksStoreQuerier) LabelValues(ctx context.Context, name string, match
407407
return nil, nil, err
408408
}
409409

410-
return strutil.MergeSlices(resValueSets...), resWarnings, nil
410+
return util.MergeSlicesParallel(util.DefaultMergeSlicesParallelism, resValueSets...), resWarnings, nil
411411
}
412412

413413
func (q *blocksStoreQuerier) Close() error {

pkg/querier/blocks_store_queryable_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
892892
}
893893
}
894894

895+
func BenchmarkBlocksStoreQuerier_Labels(b *testing.B) {
896+
const (
897+
minT = int64(10)
898+
maxT = int64(20)
899+
)
900+
ctx := user.InjectOrgID(context.Background(), "user-1")
901+
reg := prometheus.NewPedanticRegistry()
902+
blocks := bucketindex.Blocks{}
903+
resps := map[BlocksStoreClient][]ulid.ULID{}
904+
for i := 0; i < 500; i++ {
905+
b := &bucketindex.Block{ID: ulid.MustNew(uint64(i), nil)}
906+
blocks = append(blocks, b)
907+
values := []string{}
908+
for j := i; j < i+300; j++ {
909+
values = append(values, fmt.Sprintf("Value_%v", j))
910+
}
911+
resps[&storeGatewayClientMock{
912+
remoteAddr: "1.1.1.1",
913+
mockedLabelValuesResponse: &storepb.LabelValuesResponse{
914+
Values: values,
915+
Warnings: []string{},
916+
Hints: mockValuesHints(b.ID),
917+
},
918+
}] = []ulid.ULID{b.ID}
919+
}
920+
921+
stores := &blocksStoreSetMock{mockedResponses: []interface{}{resps}, rotateMockResults: true}
922+
finder := &blocksFinderMock{}
923+
924+
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
925+
926+
q := &blocksStoreQuerier{
927+
minT: minT,
928+
maxT: maxT,
929+
finder: finder,
930+
stores: stores,
931+
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
932+
logger: log.NewNopLogger(),
933+
metrics: newBlocksStoreQueryableMetrics(reg),
934+
limits: &blocksStoreLimitsMock{},
935+
}
936+
937+
b.ResetTimer()
938+
b.ReportAllocs()
939+
for i := 0; i < b.N; i++ {
940+
_, _, err := q.LabelValues(ctx, labels.MetricName)
941+
require.NoError(b, err)
942+
}
943+
}
944+
895945
func TestBlocksStoreQuerier_Labels(t *testing.T) {
896946
t.Parallel()
897947

@@ -1619,6 +1669,8 @@ type blocksStoreSetMock struct {
16191669

16201670
mockedResponses []interface{}
16211671
nextResult int
1672+
1673+
rotateMockResults bool
16221674
}
16231675

16241676
func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string, _ map[ulid.ULID]map[string]int) (map[BlocksStoreClient][]ulid.ULID, error) {
@@ -1629,6 +1681,10 @@ func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.U
16291681
res := m.mockedResponses[m.nextResult]
16301682
m.nextResult++
16311683

1684+
if m.rotateMockResults {
1685+
m.nextResult = m.nextResult % len(m.mockedResponses)
1686+
}
1687+
16321688
if err, ok := res.(error); ok {
16331689
return nil, err
16341690
}

pkg/util/strings.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ import (
77
"github.com/bboreham/go-loser"
88
)
99

10+
const (
11+
// DefaultMergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
12+
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
13+
DefaultMergeSlicesParallelism = 8
14+
)
15+
1016
// StringsContain returns true if the search value is within the list of input values.
1117
func StringsContain(values []string, search string) bool {
1218
for _, v := range values {

0 commit comments

Comments
 (0)