Skip to content

Small HA tracker cleanup #3808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 11, 2021
193 changes: 139 additions & 54 deletions integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"context"
"errors"
"sort"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
Expand All @@ -20,18 +22,104 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
)

func TestKV_List_Delete(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
func TestKVList(t *testing.T) {
testKVs(t, func(t *testing.T, client kv.Client, reg *prometheus.Registry) {
// Create keys to list back
keysToCreate := []string{"key-a", "key-b", "key-c"}
for _, key := range keysToCreate {
err := client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return key, false, nil
})
require.NoError(t, err, "could not create key")
}

// Start dependencies
etcdSvc := e2edb.NewETCD()
consulSvc := e2edb.NewConsul()
// Get list of keys and sort them
keys, err := client.List(context.Background(), "")
require.NoError(t, err, "could not list keys")
sort.Strings(keys)
require.Equal(t, keysToCreate, keys, "returned key paths did not match created paths")

require.NoError(t, s.StartAndWaitReady(etcdSvc, consulSvc))
verifyClientMetrics(t, reg, map[string]uint64{
"List": 1,
"CAS": 3,
})
})
}

reg := prometheus.NewRegistry()
func TestKVDelete(t *testing.T) {
testKVs(t, func(t *testing.T, client kv.Client, reg *prometheus.Registry) {
// Create a key
err := client.CAS(context.Background(), "key-to-delete", func(in interface{}) (out interface{}, retry bool, err error) {
return "key-to-delete", false, nil
})
require.NoError(t, err, "object could not be created")

// Now delete it
err = client.Delete(context.Background(), "key-to-delete")
require.NoError(t, err)

// Get it back
v, err := client.Get(context.Background(), "key-to-delete")
require.NoError(t, err, "unexpected error")
require.Nil(t, v, "object was not deleted")

verifyClientMetrics(t, reg, map[string]uint64{
"Delete": 1,
"CAS": 1,
"GET": 1,
})
})
}

func TestKVWatchAndDelete(t *testing.T) {
testKVs(t, func(t *testing.T, client kv.Client, reg *prometheus.Registry) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := client.CAS(context.Background(), "key-before-watch", func(in interface{}) (out interface{}, retry bool, err error) {
return "value-before-watch", false, nil
})
require.NoError(t, err)

w := &watcher{}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
w.watch(ctx, client)
}()

err = client.CAS(context.Background(), "key-to-delete", func(in interface{}) (out interface{}, retry bool, err error) {
return "value-to-delete", false, nil
})
require.NoError(t, err, "object could not be created")

// Give watcher time to receive notification.
time.Sleep(500 * time.Millisecond)

// Now delete it
err = client.Delete(context.Background(), "key-to-delete")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a sleep between creating the key-to-delete and deleting it? I'm wondering what happens if the two operations happens very close each other: the watcher may not get any notification (depending on how the backend is implemented).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's do that.

require.NoError(t, err)

// Give watcher time to receive notification for delete, if any.
time.Sleep(500 * time.Millisecond)

// Stop the watcher
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cancel immediately there's no guarantee the WatchPrefix() has actually had the time to receive the update. I think we should wait "a bit".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, and it's fixed in #3809, but let's add a wait here.

wg.Wait()

// Consul reports:
// map[key-before-watch:[value-before-watch] key-to-delete:[value-to-delete]]
//
// Etcd reports:
// map[key-to-delete:[value-to-delete ""]]
t.Log(w.values)
})
}

func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client {
etcdSvc := e2edb.NewETCD()
require.NoError(t, scenario.StartAndWaitReady(etcdSvc))

etcdKv, err := kv.NewClient(kv.Config{
Store: "etcd",
Expand All @@ -46,6 +134,13 @@ func TestKV_List_Delete(t *testing.T) {
}, stringCodec{}, reg)
require.NoError(t, err)

return etcdKv
}

func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client {
consulSvc := e2edb.NewConsul()
require.NoError(t, scenario.StartAndWaitReady(consulSvc))

consulKv, err := kv.NewClient(kv.Config{
Store: "consul",
Prefix: "keys/",
Expand All @@ -60,58 +155,39 @@ func TestKV_List_Delete(t *testing.T) {
}, stringCodec{}, reg)
require.NoError(t, err)

kvs := []struct {
name string
kv kv.Client
}{
{"etcd", etcdKv},
{"consul", consulKv},
}
return consulKv
}

for _, kv := range kvs {
t.Run(kv.name+"_list", func(t *testing.T) {
// Create keys to list back
keysToCreate := []string{"key-a", "key-b", "key-c"}
for _, key := range keysToCreate {
err := kv.kv.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) {
return key, false, nil
})
require.NoError(t, err, "could not create key")
}
func testKVs(t *testing.T, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
setupFns := map[string]func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client{
"etcd": setupEtcd,
"consul": setupConsul,
}

// Get list of keys and sort them
keys, err := kv.kv.List(context.Background(), "")
require.NoError(t, err, "could not list keys")
sort.Strings(keys)
require.Equal(t, keysToCreate, keys, "returned key paths did not match created paths")
for name, setupFn := range setupFns {
t.Run(name, func(t *testing.T) {
testKVScenario(t, setupFn, testFn)
})
}
}

t.Run(kv.name+"_delete", func(t *testing.T) {
// Create a key
err = kv.kv.CAS(context.Background(), "key-to-delete", func(in interface{}) (out interface{}, retry bool, err error) {
return "key-to-delete", false, nil
})
require.NoError(t, err, "object could not be created")

// Now delete it
err = kv.kv.Delete(context.Background(), "key-to-delete")
require.NoError(t, err)
func testKVScenario(t *testing.T, kvSetupFn func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Get it back
v, err := kv.kv.Get(context.Background(), "key-to-delete")
require.NoError(t, err, "unexpected error")
require.Nil(t, v, "object was not deleted")
})
}
reg := prometheus.NewRegistry()
client := kvSetupFn(t, s, reg)
testFn(t, client, reg)
}

// Ensure the proper histogram metrics are reported
func verifyClientMetrics(t *testing.T, reg *prometheus.Registry, sampleCounts map[string]uint64) {
metrics, err := reg.Gather()
require.NoError(t, err)

require.Len(t, metrics, 1)
require.Equal(t, "cortex_kv_request_duration_seconds", metrics[0].GetName())
require.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType())
require.Len(t, metrics[0].GetMetric(), 8)

getMetricOperation := func(labels []*dto.LabelPair) (string, error) {
for _, l := range labels {
Expand All @@ -124,12 +200,9 @@ func TestKV_List_Delete(t *testing.T) {

for _, metric := range metrics[0].GetMetric() {
op, err := getMetricOperation(metric.Label)

require.NoErrorf(t, err, "No operation label found in metric %v", metric.String())
if op == "CAS" {
require.Equal(t, uint64(4), metric.GetHistogram().GetSampleCount())
} else {
require.Equal(t, uint64(1), metric.GetHistogram().GetSampleCount())
}
assert.Equal(t, sampleCounts[op], metric.GetHistogram().GetSampleCount(), op)
}
}

Expand All @@ -138,3 +211,15 @@ type stringCodec struct{}
func (c stringCodec) Decode(bb []byte) (interface{}, error) { return string(bb), nil }
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil }
func (c stringCodec) CodecID() string { return "stringCodec" }

type watcher struct {
values map[string][]interface{}
}

func (w *watcher) watch(ctx context.Context, client kv.Client) {
w.values = map[string][]interface{}{}
client.WatchPrefix(ctx, "", func(key string, value interface{}) bool {
w.values[key] = append(w.values[key], value)
return true
})
}
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
// ruler's dependency)
canJoinDistributorsRing := t.Cfg.isModuleEnabled(Distributor) || t.Cfg.isModuleEnabled(All)

t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer)
t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return
}
Expand Down
26 changes: 14 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand All @@ -30,7 +31,6 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/math"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -131,6 +131,7 @@ type Distributor struct {
services.Service

cfg Config
log log.Logger
ingestersRing ring.ReadRing
ingesterPool *ring_client.Pool
limits *validation.Overrides
Expand Down Expand Up @@ -209,7 +210,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) {
return ingester_client.MakeIngesterClient(addr, clientConfig)
Expand All @@ -219,13 +220,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout

replicas, err := newClusterTracker(cfg.HATrackerConfig, limits, reg)
haTracker, err := newHATracker(cfg.HATrackerConfig, limits, reg, log)
if err != nil {
return nil, err
}

subservices := []services.Service(nil)
subservices = append(subservices, replicas)
subservices = append(subservices, haTracker)

// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
Expand All @@ -250,12 +251,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log.Logger),
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: replicas,
HATracker: haTracker,
activeUsers: util.NewActiveUsers(),
}

Expand Down Expand Up @@ -285,7 +287,8 @@ func (d *Distributor) running(ctx context.Context) error {
case <-metricsCleanupTimer.C:
inactiveUsers := d.activeUsers.PurgeInactiveUsers(time.Now().Add(-inactiveUserTimeout).UnixNano())
for _, userID := range inactiveUsers {
cleanupMetricsForUser(userID)
cleanupMetricsForUser(userID, d.log)
d.HATracker.cleanupHATrackerMetricsForUser(userID)
}
continue

Expand All @@ -298,7 +301,7 @@ func (d *Distributor) running(ctx context.Context) error {
}
}

func cleanupMetricsForUser(userID string) {
func cleanupMetricsForUser(userID string, log log.Logger) {
receivedSamples.DeleteLabelValues(userID)
receivedMetadata.DeleteLabelValues(userID)
incomingSamples.DeleteLabelValues(userID)
Expand All @@ -307,11 +310,10 @@ func cleanupMetricsForUser(userID string) {
latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)

if err := util.DeleteMatchingLabels(dedupedSamples, map[string]string{"user": userID}); err != nil {
level.Warn(log.Logger).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
level.Warn(log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}

validation.DeletePerUserValidationMetrics(userID, log.Logger)
cleanupHATrackerMetricsForUser(userID, log.Logger)
validation.DeletePerUserValidationMetrics(userID, log)
}

// Called after distributor is asked to stop via StopAsync.
Expand Down Expand Up @@ -389,7 +391,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica

// At this point we know we have both HA labels, we should lookup
// the cluster/instance here to see if we want to accept this sample.
err := d.HATracker.checkReplica(ctx, userID, cluster, replica)
err := d.HATracker.checkReplica(ctx, userID, cluster, replica, time.Now())
// checkReplica should only have returned an error if there was a real error talking to Consul, or if the replica labels don't match.
if err != nil { // Don't accept the sample.
return false, err
Expand Down
Loading