Skip to content

Don't track as error an HA tracker CAS operation intentionally aborted #3745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Prometheus upgraded. #3739
* Avoid unnecessary `runtime.GC()` during compactions.
* Prevent compaction loop in TSDB on data gap.
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745

## 1.7.0 in progress

Expand Down
7 changes: 6 additions & 1 deletion pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
}

// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
// is less than failOver timeout amount of time since the timestamp in the KV store.
// is less than failover timeout amount of time since the timestamp in the KV store.
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica}
}
Expand Down Expand Up @@ -306,6 +306,11 @@ func (e replicasNotMatchError) Is(err error) bool {
return ok1 || ok2
}

// IsOperationAborted returns whether the error has been caused by an operation intentionally aborted.
func (e replicasNotMatchError) IsOperationAborted() bool {
return true
}

type tooManyClustersError struct {
limit int
}
Expand Down
43 changes: 40 additions & 3 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
Expand Down Expand Up @@ -196,13 +199,14 @@ func TestCheckReplicaMultiCluster(t *testing.T) {
replica1 := "replica1"
replica2 := "replica2"

reg := prometheus.NewPedanticRegistry()
c, err := newClusterTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "inmemory"},
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxClusters: 100}, nil)
}, trackerLimits{maxClusters: 100}, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand All @@ -224,20 +228,34 @@ func TestCheckReplicaMultiCluster(t *testing.T) {
assert.NoError(t, err)
err = c.checkReplica(context.Background(), "user", "c2", replica1)
assert.NoError(t, err)

// We expect no CAS operation failures.
metrics, err := reg.Gather()
require.NoError(t, err)

assert.Equal(t, uint64(0), util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "5.*"),
}))
assert.Greater(t, util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "2.*"),
}), uint64(0))
}

func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
start := mtime.Now()
replica1 := "replica1"
replica2 := "replica2"

reg := prometheus.NewPedanticRegistry()
c, err := newClusterTracker(HATrackerConfig{
EnableHATracker: true,
KVStore: kv.Config{Store: "inmemory"},
UpdateTimeout: 100 * time.Millisecond,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}, trackerLimits{maxClusters: 100}, nil)
}, trackerLimits{maxClusters: 100}, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
Expand All @@ -259,7 +277,13 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
err = c.checkReplica(context.Background(), "user", "c2", replica1)
assert.NoError(t, err)

// Wait more than the timeout.
// Reject samples from replica 2 in each cluster.
err = c.checkReplica(context.Background(), "user", "c1", replica2)
assert.Error(t, err)
err = c.checkReplica(context.Background(), "user", "c2", replica2)
assert.Error(t, err)

// Wait more than the failover timeout.
mtime.NowForce(start.Add(1100 * time.Millisecond))

// Accept a sample from c1/replica2.
Expand All @@ -271,6 +295,19 @@ func TestCheckReplicaMultiClusterTimeout(t *testing.T) {
assert.Error(t, err)
err = c.checkReplica(context.Background(), "user", "c2", replica1)
assert.NoError(t, err)

// We expect no CAS operation failures.
metrics, err := reg.Gather()
require.NoError(t, err)

assert.Equal(t, uint64(0), util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "5.*"),
}))
assert.Greater(t, util.GetSumOfHistogramSampleCount(metrics, "cortex_kv_request_duration_seconds", labels.Selector{
labels.MustNewMatcher(labels.MatchEqual, "operation", "CAS"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", "2.*"),
}), uint64(0))
}

// Test that writes only happen every update timeout.
Expand Down
14 changes: 11 additions & 3 deletions pkg/ring/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ func RegistererWithKVName(reg prometheus.Registerer, name string) prometheus.Reg
return prometheus.WrapRegistererWith(prometheus.Labels{"kv_name": name}, reg)
}

// errorCode converts an error into an HTTP status code, modified from weaveworks/common/instrument
func errorCode(err error) string {
// getCasErrorCode converts the provided CAS error into the code that should be used to track the operation
// in metrics.
func getCasErrorCode(err error) string {
if err == nil {
return "200"
}
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
return strconv.Itoa(int(resp.GetCode()))
}

// If the error has been returned to abort the CAS operation, then we shouldn't
// consider it an error when tracking metrics.
if casErr, ok := err.(interface{ IsOperationAborted() bool }); ok && casErr.IsOperationAborted() {
return "200"
}

return "500"
}

Expand Down Expand Up @@ -81,7 +89,7 @@ func (m metrics) Delete(ctx context.Context, key string) error {
}

func (m metrics) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return instrument.CollectedRequest(ctx, "CAS", m.requestDuration, errorCode, func(ctx context.Context) error {
return instrument.CollectedRequest(ctx, "CAS", m.requestDuration, getCasErrorCode, func(ctx context.Context) error {
return m.c.CAS(ctx, key, f)
})
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prometheus/pkg/labels"
)

// Data for single value (counter/gauge) with labels.
Expand Down Expand Up @@ -661,3 +662,39 @@ func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser {
}
return data
}

// FromLabelPairsToLabels converts dto.LabelPair into labels.Labels.
func FromLabelPairsToLabels(pairs []*dto.LabelPair) labels.Labels {
builder := labels.NewBuilder(nil)
for _, pair := range pairs {
builder.Set(pair.GetName(), pair.GetValue())
}
return builder.Labels()
}

// GetSumOfHistogramSampleCount returns the sum of samples count of histograms matching the provided metric name
// and optional label matchers. Returns 0 if no metric matches.
func GetSumOfHistogramSampleCount(families []*dto.MetricFamily, metricName string, matchers labels.Selector) uint64 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unit tests I've added in the distributor I couldn't use the typical testutil.GatherAndCompare() because the histogram tracks the request duration (which is unpredictable). Since we don't have a way to just test the histogram sample count, I've introduced this function to easy it (and future tests too).

sum := uint64(0)

for _, metric := range families {
if metric.GetName() != metricName {
continue
}

if metric.GetType() != dto.MetricType_HISTOGRAM {
continue
}

for _, series := range metric.GetMetric() {
if !matchers.Matches(FromLabelPairsToLabels(series.GetLabel())) {
continue
}

histogram := series.GetHistogram()
sum += histogram.GetSampleCount()
}
}

return sum
}