Skip to content

Fix canceled distributor push requests as 499 instead of 500 #5018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
* [FEATURE] Query Frontend: Log query params in query frontend even if error happens. #5005
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
* [BUGFIX] Fix canceled distributor push requests as 499 instead of 500. #5018
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet many users are going to understand this line as clients will receive 499. Can you make it obvious that this only affects internal metrics and logs?


## 1.14.0 2022-12-02

Expand Down
28 changes: 28 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,34 @@ func TestDistributor_Push(t *testing.T) {
}
}

func TestDistributor_ContextCanceledRequest(t *testing.T) {
t.Cleanup(func() {
_ = func() time.Time { return time.Now() }
})

ds, ings, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
})

// Lock all mockIngester instances, so they will be waiting
for i := range ings {
ings[i].Lock()
defer func(ing *mockIngester) {
ing.Unlock()
}(ings[i])
}

ctx := user.InjectOrgID(context.Background(), "user")
ctx, cancel := context.WithCancel(ctx)
cancel()
request := makeWriteRequest(123456789000, 1, 1)
_, err := ds[0].Push(ctx, request)
require.Error(t, err)
require.ErrorIs(t, err, context.Canceled)
}

func TestDistributor_MetricsCleanup(t *testing.T) {
dists, _, regs, _ := prepare(t, prepConfig{
numDistributors: 1,
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package push

import (
"context"
"errors"
"net/http"

"github.com/go-kit/log/level"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/log"
)

const statusClientCanceledRequest = 499

// Func defines the type of the push. It is similar to http.HandlerFunc.
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

Expand Down Expand Up @@ -42,6 +45,12 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
}

if _, err := push(ctx, &req.WriteRequest); err != nil {
if errors.Is(err, context.Canceled) {
http.Error(w, err.Error(), statusClientCanceledRequest)
level.Warn(logger).Log("msg", "push request canceled", "err", err)
return
}

resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
23 changes: 23 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package push
import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -34,6 +35,28 @@ func TestHandler_cortexWriteRequest(t *testing.T) {
assert.Equal(t, 200, resp.Code)
}

func TestHandler_contextCanceledRequest(t *testing.T) {
req := createRequest(t, createCortexWriteRequestProtobuf(t, false))
resp := httptest.NewRecorder()
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(100000, sourceIPs, func(_ context.Context, _ *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return nil, fmt.Errorf("the request failed: %w", context.Canceled)
})
handler.ServeHTTP(resp, req)
assert.Equal(t, 499, resp.Code)
}

func TestHandler_contextDeadlineExceededRequest(t *testing.T) {
req := createRequest(t, createCortexWriteRequestProtobuf(t, false))
resp := httptest.NewRecorder()
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(100000, sourceIPs, func(_ context.Context, _ *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return nil, fmt.Errorf("the request failed: %w", context.DeadlineExceeded)
})
handler.ServeHTTP(resp, req)
assert.Equal(t, 500, resp.Code)
}

func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) {
for _, req := range []*http.Request{
createRequest(t, createCortexWriteRequestProtobuf(t, true)),
Expand Down