Skip to content

Commit 2d1604c

Browse files
alvinlin123bborehamaknuds1
authored
Fix bug where querier may not be able to achieve max-concurrent (#4417)
* Fix bug where querier may not be able to achieve max-concurrent Signed-off-by: Alvin Lin <alvinlin@amazon.com> * Update change log Signed-off-by: Alvin Lin <alvinlin@amazon.com> * Update CHANGELOG.md Co-authored-by: Bryan Boreham <bjboreham@gmail.com> Signed-off-by: Alvin Lin <alvinlin@amazon.com> * Update pkg/querier/worker/worker_test.go Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> Signed-off-by: Alvin Lin <alvinlin@amazon.com> * Address PR comments Signed-off-by: Alvin Lin <alvinlin@amazon.com> Co-authored-by: Bryan Boreham <bjboreham@gmail.com> Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
1 parent 81ac414 commit 2d1604c

File tree

3 files changed

+46
-31
lines changed

3 files changed

+46
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
4646
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
4747
* [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366
48-
48+
* [BUGFIX] Querier: After query-frontend restart, querier may have lower than configured concurrency. #4417
4949

5050
## 1.10.0 / 2021-08-03
5151

pkg/querier/worker/worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ func (w *querierWorker) AddressRemoved(address string) {
208208
w.mu.Lock()
209209
p := w.managers[address]
210210
delete(w.managers, address)
211+
// Called with lock.
212+
w.resetConcurrency()
211213
w.mu.Unlock()
212214

213215
if p != nil {

pkg/querier/worker/worker_test.go

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package worker
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
67
"testing"
78
"time"
89

@@ -17,46 +18,52 @@ import (
1718

1819
func TestResetConcurrency(t *testing.T) {
1920
tests := []struct {
20-
name string
21-
parallelism int
22-
maxConcurrent int
23-
numTargets int
24-
expectedConcurrency int
21+
name string
22+
parallelism int
23+
maxConcurrent int
24+
numTargets int
25+
expectedConcurrency int
26+
expectedConcurrencyAfterTargetRemoval int
2527
}{
2628
{
27-
name: "Test create at least one processor per target",
28-
parallelism: 0,
29-
maxConcurrent: 0,
30-
numTargets: 2,
31-
expectedConcurrency: 2,
29+
name: "Test create at least one processor per target",
30+
parallelism: 0,
31+
maxConcurrent: 0,
32+
numTargets: 2,
33+
expectedConcurrency: 2,
34+
expectedConcurrencyAfterTargetRemoval: 1,
3235
},
3336
{
34-
name: "Test parallelism per target",
35-
parallelism: 4,
36-
maxConcurrent: 0,
37-
numTargets: 2,
38-
expectedConcurrency: 8,
37+
name: "Test parallelism per target",
38+
parallelism: 4,
39+
maxConcurrent: 0,
40+
numTargets: 2,
41+
expectedConcurrency: 8,
42+
expectedConcurrencyAfterTargetRemoval: 4,
3943
},
4044
{
41-
name: "Test Total Parallelism with a remainder",
42-
parallelism: 1,
43-
maxConcurrent: 7,
44-
numTargets: 4,
45-
expectedConcurrency: 7,
45+
name: "Test Total Parallelism with a remainder",
46+
parallelism: 1,
47+
maxConcurrent: 7,
48+
numTargets: 4,
49+
expectedConcurrency: 7,
50+
expectedConcurrencyAfterTargetRemoval: 7,
4651
},
4752
{
48-
name: "Test Total Parallelism dividing evenly",
49-
parallelism: 1,
50-
maxConcurrent: 6,
51-
numTargets: 2,
52-
expectedConcurrency: 6,
53+
name: "Test Total Parallelism dividing evenly",
54+
parallelism: 1,
55+
maxConcurrent: 6,
56+
numTargets: 2,
57+
expectedConcurrency: 6,
58+
expectedConcurrencyAfterTargetRemoval: 6,
5359
},
5460
{
55-
name: "Test Total Parallelism at least one worker per target",
56-
parallelism: 1,
57-
maxConcurrent: 3,
58-
numTargets: 6,
59-
expectedConcurrency: 6,
61+
name: "Test Total Parallelism at least one worker per target",
62+
parallelism: 1,
63+
maxConcurrent: 3,
64+
numTargets: 6,
65+
expectedConcurrency: 6,
66+
expectedConcurrencyAfterTargetRemoval: 5,
6067
},
6168
}
6269

@@ -82,6 +89,12 @@ func TestResetConcurrency(t *testing.T) {
8289
return getConcurrentProcessors(w)
8390
})
8491

92+
// now we remove an address and ensure we still have the expected concurrency
93+
w.AddressRemoved(fmt.Sprintf("127.0.0.1:%d", rand.Intn(tt.numTargets)))
94+
test.Poll(t, 250*time.Millisecond, tt.expectedConcurrencyAfterTargetRemoval, func() interface{} {
95+
return getConcurrentProcessors(w)
96+
})
97+
8598
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), w))
8699
assert.Equal(t, 0, getConcurrentProcessors(w))
87100
})

0 commit comments

Comments
 (0)