Skip to content

Ensure queries are cancelled correctly via the frontend. #1508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 36 additions & 57 deletions pkg/querier/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *ProcessRequest) (*Pro
request := &request{
request: req,
originalCtx: ctx,
// Buffer of 1 to ensure response can be written even if client has gone away.

// Buffer of 1 to ensure response can be written by the server side
// of the Process stream, even if this goroutine goes away due to
// client context cancellation.
err: make(chan error, 1),
response: make(chan *ProcessResponse, 1),
}
Expand Down Expand Up @@ -262,80 +265,56 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *ProcessRequest) (*Pro

// Process allows backends to pull requests from the frontend.
func (f *Frontend) Process(server Frontend_ProcessServer) error {
var (
sendChan = make(chan *ProcessRequest)
recvChan = make(chan *ProcessResponse, 1)

// Need buffer of 2 so goroutines reading/writing to stream don't hang
// around when stream dies.
errChan = make(chan error, 2)
)

// If the stream from the querier is canceled, ping the condition to unblock.
// This is done once, here (instead of in getNextRequest) as we expect calls
// to Process to process many requests.
// If the downstream request(from querier -> frontend) is cancelled,
// we need to ping the condition variable to unblock getNextRequest.
// Ideally we'd have ctx aware condition variables...
go func() {
<-server.Context().Done()
f.cond.Broadcast()
}()

// Use a pair of goroutines to read/write from the stream and send to channels,
// so we can use selects to also wait on the cancellation of the request context.
// These goroutines will error out when the stream returns.
go func() {
for {
var req *ProcessRequest
select {
case req = <-sendChan:
case <-server.Context().Done():
return
}
for {
req, err := f.getNextRequest(server.Context())
if err != nil {
return err
}

err := server.Send(req)
// Handle the stream sending & receiving on a goroutine so we can
// monitoring the contexts in a select and cancel things appropriately.
resps := make(chan *ProcessResponse, 1)
errs := make(chan error, 1)
go func() {
err = server.Send(req.request)
if err != nil {
errChan <- err
errs <- err
return
}
}
}()

go func() {
for {
resp, err := server.Recv()
if err == nil {
recvChan <- resp
} else {
errChan <- err
if err != nil {
errs <- err
return
}
}
}()

for {
request, err := f.getNextRequest(server.Context())
if err != nil {
return err
}

originalCtx := request.originalCtx
resps <- resp
}()

select {
case sendChan <- request.request:
case err := <-errChan:
request.err <- err
// If the upstream reqeust is cancelled, we need to cancel the
// downstream req. Only way we can do that is to close the stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means if 5 requests are being handled on this stream, all of them will error out?

Copy link
Contributor

@gouthamve gouthamve Jul 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means a user can DOS the service by just sending requests and immediately cancelling them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should only ever be one concurrent request per stream, so I don't think this is a problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right!

// The worker client is expecting this semantics.
case <-req.originalCtx.Done():
return req.originalCtx.Err()

// Is there was an error handling this request due to network IO,
// then error out this upstream request _and_ stream.
case err := <-errs:
req.err <- err
return err
case <-originalCtx.Done():
return originalCtx.Err()
}

select {
case resp := <-recvChan:
request.response <- resp
case err := <-errChan:
request.err <- err
return err
case <-originalCtx.Done():
return originalCtx.Err()
// Happy path: propagate the response.
case resp := <-resps:
req.response <- resp
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"sync/atomic"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -124,6 +125,37 @@ func TestFrontendPropagateTrace(t *testing.T) {
testFrontend(t, handler, test)
}

// TestFrontendCancel ensures that when client requests are cancelled,
// the underlying query is correctly cancelled _and not retried_.
func TestFrontendCancel(t *testing.T) {
var tries int32
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-r.Context().Done()
atomic.AddInt32(&tries, 1)
})
test := func(addr string) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
require.NoError(t, err)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)

go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

_, err = http.DefaultClient.Do(req)
require.Error(t, err)

time.Sleep(100 * time.Millisecond)
assert.Equal(t, int32(1), atomic.LoadInt32(&tries))
}
testFrontend(t, handler, test)
}

func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
logger := log.NewNopLogger() //log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

Expand All @@ -133,6 +165,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
)
flagext.DefaultValues(&config, &workerConfig)
config.SplitQueriesByDay = true
workerConfig.Parallelism = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
Expand Down
60 changes: 35 additions & 25 deletions pkg/querier/frontend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (w *worker) runOne(ctx context.Context, client FrontendClient) {
continue
}

if err := w.process(ctx, c); err != nil {
if err := w.process(c); err != nil {
level.Error(w.log).Log("msg", "error processing requests", "err", err)
backoff.Wait()
continue
Expand All @@ -184,41 +184,51 @@ func (w *worker) runOne(ctx context.Context, client FrontendClient) {
}

// process loops processing requests on an established stream.
func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error {
func (w *worker) process(c Frontend_ProcessClient) error {
// Build a child context so we can cancel querie when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()

for {
request, err := c.Recv()
if err != nil {
return err
}

response, err := w.server.Handle(ctx, request.HttpRequest)
if err != nil {
var ok bool
response, ok = httpgrpc.HTTPResponseFromError(err)
if !ok {
response = &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
// Handle the request on a "background" goroutine, so we go back to
// blocking on c.Recv(). This allows us to detect the stream closing
// and cancel the query. We don't actally handle queries in parallel
// here, as we're running in lock step with the server - each Recv is
// paired with a Send.
go func() {
response, err := w.server.Handle(ctx, request.HttpRequest)
if err != nil {
var ok bool
response, ok = httpgrpc.HTTPResponseFromError(err)
if !ok {
response = &httpgrpc.HTTPResponse{
Code: http.StatusInternalServerError,
Body: []byte(err.Error()),
}
}
}
}

if len(response.Body) >= w.cfg.GRPCClientConfig.MaxSendMsgSize {
errMsg := fmt.Sprintf("the response is larger than the max (%d vs %d)", len(response.Body), w.cfg.GRPCClientConfig.MaxSendMsgSize)

// This makes sure the request is not retried, else a 500 is sent and we retry the large query again.
response = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
// Ensure responses that are too big are not retried.
if len(response.Body) >= w.cfg.GRPCClientConfig.MaxSendMsgSize {
errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), w.cfg.GRPCClientConfig.MaxSendMsgSize)
response = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
}
level.Error(w.log).Log("msg", "error processing query", "err", errMsg)
}
level.Error(w.log).Log("msg", "error processing query", "err", errMsg)
}

if err := c.Send(&ProcessResponse{
HttpResponse: response,
}); err != nil {
return err
}
if err := c.Send(&ProcessResponse{
HttpResponse: response,
}); err != nil {
level.Error(w.log).Log("msg", "error processing requests", "err", err)
}
}()
}
}

Expand Down