Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in disconnecting querier #5063

Merged
merged 5 commits into from
Jan 10, 2022
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

cyriltovena committed Jan 7, 2022
commit bad345103439598bef46003bb9a32fe03e343fe6
19 changes: 8 additions & 11 deletions pkg/lokifrontend/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
@@ -61,8 +61,8 @@ func TestFrontend(t *testing.T) {
assert.Equal(t, "Hello World", string(body))
}

testFrontend(t, defaultFrontendConfig(), handler, test, false, nil, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil)
}

func TestFrontendPropagateTrace(t *testing.T) {
@@ -111,8 +111,8 @@ func TestFrontendPropagateTrace(t *testing.T) {
// Query should do one call.
assert.Equal(t, traceID, <-observedTraceID)
}
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil)
}

func TestFrontendCheckReady(t *testing.T) {
@@ -176,9 +176,9 @@ func TestFrontendCancel(t *testing.T) {
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int32(1), tries.Load())
}
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil)
tries.Store(0)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil)
}

func TestFrontendMetricsCleanup(t *testing.T) {
@@ -220,15 +220,12 @@ func TestFrontendMetricsCleanup(t *testing.T) {
`), "cortex_query_frontend_queue_length"))
}

testFrontend(t, defaultFrontendConfig(), handler, test, matchMaxConcurrency, nil, reg)
testFrontend(t, defaultFrontendConfig(), handler, test, matchMaxConcurrency, reg)
}
}

func testFrontend(t *testing.T, config Config, handler http.Handler, test func(addr string, frontend *Frontend), matchMaxConcurrency bool, l log.Logger, reg prometheus.Registerer) {
func testFrontend(t *testing.T, config Config, handler http.Handler, test func(addr string, frontend *Frontend), matchMaxConcurrency bool, reg prometheus.Registerer) {
logger := log.NewNopLogger()
if l != nil {
logger = l
}

var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
16 changes: 7 additions & 9 deletions pkg/lokifrontend/frontend/v1/queue_test.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
)

func setupFrontend(t *testing.T, config Config) (*Frontend, error) {
func setupFrontend(t *testing.T, config Config) *Frontend {
logger := log.NewNopLogger()

frontend, err := New(config, limits{queriers: 3}, logger, nil)
@@ -29,7 +29,7 @@ func setupFrontend(t *testing.T, config Config) (*Frontend, error) {
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), frontend))
})
return frontend, nil
return frontend
}

func testReq(ctx context.Context, reqID, user string) *request {
@@ -51,8 +51,7 @@ func TestDequeuesExpiredRequests(t *testing.T) {
config.MaxOutstandingPerTenant = 10
userID := "1"

f, err := setupFrontend(t, config)
require.NoError(t, err)
f := setupFrontend(t, config)

ctx := user.InjectOrgID(context.Background(), userID)
expired, cancel := context.WithCancel(ctx)
@@ -77,7 +76,7 @@ func TestDequeuesExpiredRequests(t *testing.T) {
defer cancel2()

m := &processServerMock{ctx: ctx2, querierID: "querier"}
err = f.Process(m)
err := f.Process(m)
require.EqualError(t, err, context.DeadlineExceeded.Error())

// Verify that only non-expired requests were forwarded to querier.
@@ -98,14 +97,13 @@ func TestRoundRobinQueues(t *testing.T) {

config.MaxOutstandingPerTenant = requests

f, err := setupFrontend(t, config)
require.NoError(t, err)
f := setupFrontend(t, config)

for i := 0; i < requests; i++ {
userID := fmt.Sprint(i / tenants)
ctx := user.InjectOrgID(context.Background(), userID)

err = f.queueRequest(ctx, testReq(ctx, fmt.Sprintf("%d", i), userID))
err := f.queueRequest(ctx, testReq(ctx, fmt.Sprintf("%d", i), userID))
require.NoError(t, err)
}

@@ -115,7 +113,7 @@ func TestRoundRobinQueues(t *testing.T) {
defer cancel()

m := &processServerMock{ctx: ctx, querierID: "querier"}
err = f.Process(m)
err := f.Process(m)
require.EqualError(t, err, context.DeadlineExceeded.Error())

require.Len(t, m.requests, requests)
17 changes: 8 additions & 9 deletions pkg/lokifrontend/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
@@ -13,8 +13,6 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/grafana/loki/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
@@ -23,11 +21,12 @@ import (

"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/scheduler/schedulerpb"
"github.com/grafana/loki/pkg/util/test"
)

const testFrontendWorkerConcurrency = 5

func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
l, err := net.Listen("tcp", "")
require.NoError(t, err)

@@ -48,7 +47,7 @@ func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc f

// logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()
f, err := NewFrontend(cfg, nil, logger, reg)
f, err := NewFrontend(cfg, nil, logger, nil)
require.NoError(t, err)

frontendv2pb.RegisterFrontendForQuerierServer(server, f)
@@ -99,7 +98,7 @@ func TestFrontendBasicWorkflow(t *testing.T) {
userID = "test"
)

f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
// We cannot call QueryResult directly, as Frontend is not yet waiting for the response.
// It first needs to be told that enqueuing has succeeded.
go sendResponseWithDelay(f, 100*time.Millisecond, userID, msg.QueryID, &httpgrpc.HTTPResponse{
@@ -124,7 +123,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
userID = "test"
)

f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
fail := failures.Dec()
if fail >= 0 {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
@@ -142,7 +141,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
}

func TestFrontendEnqueueFailure(t *testing.T) {
f, _ := setupFrontend(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
f, _ := setupFrontend(t, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
})

@@ -152,7 +151,7 @@ func TestFrontendEnqueueFailure(t *testing.T) {
}

func TestFrontendCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)
f, ms := setupFrontend(t, nil)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
@@ -178,7 +177,7 @@ func TestFrontendCancellation(t *testing.T) {
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)
f, ms := setupFrontend(t, nil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
3 changes: 2 additions & 1 deletion pkg/querier/worker/frontend_processor_test.go
Original file line number Diff line number Diff line change
@@ -6,11 +6,12 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/loki/pkg/util/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/util/test"
)

func TestRecvFailDoesntCancelProcess(t *testing.T) {
3 changes: 2 additions & 1 deletion pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
@@ -8,10 +8,11 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/grafana/loki/pkg/util/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/util/test"
)

func TestResetConcurrency(t *testing.T) {
4 changes: 2 additions & 2 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -282,13 +282,13 @@ func TestContextCond(t *testing.T) {
})
}

func assertChanReceived(t *testing.T, c chan struct{}, timeout time.Duration, msg string, args ...interface{}) {
func assertChanReceived(t *testing.T, c chan struct{}, timeout time.Duration, msg string) {
t.Helper()

select {
case <-c:
case <-time.After(timeout):
t.Fatalf(msg, args...)
t.Fatalf(msg)
}
}