Skip to content

Commit 08433eb

Browse files
committed
Canceled distributor push requests return 499
Signed-off-by: wangguoliang <iamwgliang@gmail.com>
1 parent 734b0c7 commit 08433eb

File tree

3 files changed

+49
-0
lines changed

3 files changed

+49
-0
lines changed

pkg/distributor/distributor_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,34 @@ func TestDistributor_Push(t *testing.T) {
290290
}
291291
}
292292

293+
func TestDistributor_ContextCanceledRequest(t *testing.T) {
294+
t.Cleanup(func() {
295+
_ = func() time.Time { return time.Now() }
296+
})
297+
298+
ds, ings, _, _ := prepare(t, prepConfig{
299+
numIngesters: 3,
300+
happyIngesters: 3,
301+
numDistributors: 1,
302+
})
303+
304+
// Lock all mockIngester instances, so they will be waiting
305+
for i := range ings {
306+
ings[i].Lock()
307+
defer func(ing *mockIngester) {
308+
ing.Unlock()
309+
}(ings[i])
310+
}
311+
312+
ctx := user.InjectOrgID(context.Background(), "user")
313+
ctx, cancel := context.WithCancel(ctx)
314+
cancel()
315+
request := makeWriteRequest(123456789000, 1, 1)
316+
_, err := ds[0].Push(ctx, request)
317+
require.Error(t, err)
318+
require.ErrorIs(t, err, context.Canceled)
319+
}
320+
293321
func TestDistributor_MetricsCleanup(t *testing.T) {
294322
dists, _, regs, _ := prepare(t, prepConfig{
295323
numDistributors: 1,

pkg/util/push/push.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package push
22

33
import (
44
"context"
5+
"errors"
56
"net/http"
67

78
"github.com/go-kit/log/level"
@@ -13,6 +14,8 @@ import (
1314
"github.com/cortexproject/cortex/pkg/util/log"
1415
)
1516

17+
const statusClientCanceledRequest = 499
18+
1619
// Func defines the type of the push. It is similar to http.HandlerFunc.
1720
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
1821

@@ -42,6 +45,12 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
4245
}
4346

4447
if _, err := push(ctx, &req.WriteRequest); err != nil {
48+
if errors.Is(err, context.Canceled) {
49+
http.Error(w, err.Error(), statusClientCanceledRequest)
50+
level.Warn(logger).Log("msg", "push request canceled", "err", err)
51+
return
52+
}
53+
4554
resp, ok := httpgrpc.HTTPResponseFromError(err)
4655
if !ok {
4756
http.Error(w, err.Error(), http.StatusInternalServerError)

pkg/util/push/push_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package push
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"net/http"
78
"net/http/httptest"
89
"testing"
@@ -34,6 +35,17 @@ func TestHandler_cortexWriteRequest(t *testing.T) {
3435
assert.Equal(t, 200, resp.Code)
3536
}
3637

38+
func TestHandler_contextCanceledRequest(t *testing.T) {
39+
req := createRequest(t, createCortexWriteRequestProtobuf(t, false))
40+
resp := httptest.NewRecorder()
41+
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
42+
handler := Handler(100000, sourceIPs, func(_ context.Context, _ *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
43+
return nil, fmt.Errorf("the request failed: %w", context.Canceled)
44+
})
45+
handler.ServeHTTP(resp, req)
46+
assert.Equal(t, 499, resp.Code)
47+
}
48+
3749
func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) {
3850
for _, req := range []*http.Request{
3951
createRequest(t, createCortexWriteRequestProtobuf(t, true)),

0 commit comments

Comments
 (0)