Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context and logic to querier scheduler_processor to wait for query completion before stopping #1742

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 38 additions & 29 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

Expand Down Expand Up @@ -90,10 +91,23 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli

func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
schedulerClient := schedulerpb.NewSchedulerForQuerierClient(conn)
// We want to create a new context for the QuerierLoop to prevent the scheduler context from stopping the querier while it still has requests inflight.
queryLoopContext := context.Background()
querierShutdown := atomic.NewBool(false)

go func() {
for {
// If the stop signal has been sent and the context for the scheduler is cancelled we want to tell the querier to stop taking queries while it finishes the queries already received.
<-ctx.Done()
querierShutdown.Store(true)
level.Debug(sp.log).Log("msg", "querier shutdown received, draining remaining queries")
return
}
}()

backoff := backoff.New(ctx, processorBackoffConfig)
for backoff.Ongoing() {
c, err := schedulerClient.QuerierLoop(ctx)
c, err := schedulerClient.QuerierLoop(queryLoopContext)
if err == nil {
err = c.Send(&schedulerpb.QuerierToScheduler{QuerierID: sp.querierID})
}
Expand All @@ -104,7 +118,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
continue
}

if err := sp.querierLoop(c, address); err != nil {
if err := sp.querierLoop(c, address, querierShutdown); err != nil {
level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
backoff.Wait()
continue
Expand All @@ -115,45 +129,40 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context,
}

// process loops processing requests on an established stream.
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string) error {
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, querierShutdown *atomic.Bool) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()

for {
// If we have not been shutdown continue to receive queries and process the query.
for !querierShutdown.Load() {
request, err := c.Recv()
if err != nil {
return err
}

// Handle the request on a "background" goroutine, so we go back to
// blocking on c.Recv(). This allows us to detect the stream closing
// and cancel the query. We don't actually handle queries in parallel
// here, as we're running in lock step with the server - each Recv is
// paired with a Send.
go func() {
// We need to inject user into context for sending response back.
ctx := user.InjectOrgID(ctx, request.UserID)

tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest)
if parentSpanContext != nil {
queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext))
defer queueSpan.Finish()

ctx = spanCtx
}
logger := util_log.WithContext(ctx, sp.log)
// We need to inject user into context for sending response back.
ctx := user.InjectOrgID(ctx, request.UserID)

sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)
tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest)
if parentSpanContext != nil {
queueSpan, spanCtx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "querier_processor_runRequest", opentracing.ChildOf(parentSpanContext))
defer queueSpan.Finish()

// Report back to scheduler that processing of the query has finished.
if err := c.Send(&schedulerpb.QuerierToScheduler{}); err != nil {
level.Error(logger).Log("msg", "error notifying scheduler about finished query", "err", err, "addr", address)
}
}()
ctx = spanCtx
}
logger := util_log.WithContext(ctx, sp.log)

sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)

// Report back to scheduler that processing of the query has finished.
if err := c.Send(&schedulerpb.QuerierToScheduler{}); err != nil {
level.Error(logger).Log("msg", "error notifying scheduler about finished query", "err", err, "addr", address)
}
}
return nil
}

func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
Expand Down