Skip to content

Commit e1cef90

Browse files
authored
add context cancellation checks on merging GetLabel slices (#5837)
* add context cancellation checks on merging GetLabel slices Signed-off-by: Erlan Zholdubai uulu <erlanz@amazon.com> * add context cancellation checks on merging GetLabel slices, update changelog Signed-off-by: Erlan Zholdubai uulu <erlanz@amazon.com> --------- Signed-off-by: Erlan Zholdubai uulu <erlanz@amazon.com>
1 parent ec3fb1f commit e1cef90

File tree

4 files changed

+38
-13
lines changed

4 files changed

+38
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808
3434
* [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827
3535
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
36+
* [ENHANCEMENT] Querier: Add context error check when merging slices from ingesters for GetLabel operations. #5837
3637
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
3738
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
3839
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734

pkg/distributor/distributor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,10 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
973973
for i, resp := range resps {
974974
values[i] = resp.([]string)
975975
}
976-
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
976+
r, err := util.MergeSlicesParallel(ctx, mergeSlicesParallelism, values...)
977+
if err != nil {
978+
return nil, err
979+
}
977980
span.SetTag("result_length", len(r))
978981
return r, nil
979982
}
@@ -1043,7 +1046,10 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
10431046
for i, resp := range resps {
10441047
values[i] = resp.([]string)
10451048
}
1046-
r := util.MergeSlicesParallel(mergeSlicesParallelism, values...)
1049+
r, err := util.MergeSlicesParallel(ctx, mergeSlicesParallelism, values...)
1050+
if err != nil {
1051+
return nil, err
1052+
}
10471053
span.SetTag("result_length", len(r))
10481054

10491055
return r, nil

pkg/util/strings.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package util
22

33
import (
4+
"context"
45
"sync"
56
"unsafe"
67

@@ -37,17 +38,18 @@ func StringsClone(s string) string {
3738

3839
// MergeSlicesParallel merge sorted slices in parallel
3940
// using the MergeSortedSlices function
40-
func MergeSlicesParallel(parallelism int, a ...[]string) []string {
41+
func MergeSlicesParallel(ctx context.Context, parallelism int, a ...[]string) ([]string, error) {
4142
if parallelism <= 1 {
42-
return MergeSortedSlices(a...)
43+
return MergeSortedSlices(ctx, a...)
4344
}
4445
if len(a) == 0 {
45-
return nil
46+
return nil, nil
4647
}
4748
if len(a) == 1 {
48-
return a[0]
49+
return a[0], nil
4950
}
5051
c := make(chan []string, len(a))
52+
errCh := make(chan error, 1)
5153
wg := sync.WaitGroup{}
5254
var r [][]string
5355
p := min(parallelism, len(a)/2)
@@ -57,21 +59,31 @@ func MergeSlicesParallel(parallelism int, a ...[]string) []string {
5759
wg.Add(1)
5860
go func(i int) {
5961
m := min(len(a), i+batchSize)
60-
c <- MergeSortedSlices(a[i:m]...)
62+
r, e := MergeSortedSlices(ctx, a[i:m]...)
63+
if e != nil {
64+
errCh <- e
65+
wg.Done()
66+
return
67+
}
68+
c <- r
6169
wg.Done()
6270
}(i)
6371
}
6472

6573
go func() {
6674
wg.Wait()
6775
close(c)
76+
close(errCh)
6877
}()
6978

79+
if err := <-errCh; err != nil {
80+
return nil, err
81+
}
7082
for s := range c {
7183
r = append(r, s)
7284
}
7385

74-
return MergeSortedSlices(r...)
86+
return MergeSortedSlices(ctx, r...)
7587
}
7688

7789
func NewStringListIter(s []string) *StringListIter {
@@ -98,9 +110,9 @@ var MAX_STRING = string([]byte{0xff})
98110

99111
// MergeSortedSlices merges a set of sorted string slices into a single ones
100112
// while removing all duplicates.
101-
func MergeSortedSlices(a ...[]string) []string {
113+
func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) {
102114
if len(a) == 1 {
103-
return a[0]
115+
return a[0], nil
104116
}
105117
its := make([]*StringListIter, 0, len(a))
106118
sumLengh := 0
@@ -111,16 +123,19 @@ func MergeSortedSlices(a ...[]string) []string {
111123
lt := loser.New(its, MAX_STRING)
112124

113125
if sumLengh == 0 {
114-
return []string{}
126+
return []string{}, nil
115127
}
116128

117129
r := make([]string, 0, sumLengh*2/10)
118130
var current string
119131
for lt.Next() {
132+
if ctx.Err() != nil {
133+
return nil, ctx.Err()
134+
}
120135
if lt.At() != current {
121136
current = lt.At()
122137
r = append(r, current)
123138
}
124139
}
125-
return r
140+
return r, nil
126141
}

pkg/util/strings_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package util
22

33
import (
4+
"context"
45
"fmt"
56
"math/rand"
67
"sort"
@@ -96,12 +97,14 @@ func BenchmarkMergeSlicesParallel(b *testing.B) {
9697
b.ReportAllocs()
9798
b.ResetTimer()
9899
var r []string
100+
var err error
99101
for i := 0; i < b.N; i++ {
100102
if p == usingMap {
101103
r = sortUsingMap(input...)
102104
require.NotEmpty(b, r)
103105
} else {
104-
r = MergeSlicesParallel(int(p), input...)
106+
r, err = MergeSlicesParallel(context.Background(), int(p), input...)
107+
require.NoError(b, err)
105108
require.NotEmpty(b, r)
106109
}
107110
}

0 commit comments

Comments
 (0)