-
Notifications
You must be signed in to change notification settings - Fork 820
Small HA tracker cleanup #3808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Small HA tracker cleanup #3808
Changes from all commits
38980e2
af97d98
95ca64f
552e1dd
fc3c683
e68913e
de2ed44
7669ae5
40550bf
32039e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,11 +6,13 @@ import ( | |
"context" | ||
"errors" | ||
"sort" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
dto "github.com/prometheus/client_model/go" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/cortexproject/cortex/integration/e2e" | ||
|
@@ -20,18 +22,104 @@ import ( | |
"github.com/cortexproject/cortex/pkg/ring/kv/etcd" | ||
) | ||
|
||
func TestKV_List_Delete(t *testing.T) { | ||
s, err := e2e.NewScenario(networkName) | ||
require.NoError(t, err) | ||
defer s.Close() | ||
func TestKVList(t *testing.T) { | ||
testKVs(t, func(t *testing.T, client kv.Client, reg *prometheus.Registry) { | ||
// Create keys to list back | ||
keysToCreate := []string{"key-a", "key-b", "key-c"} | ||
for _, key := range keysToCreate { | ||
err := client.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) { | ||
return key, false, nil | ||
}) | ||
require.NoError(t, err, "could not create key") | ||
} | ||
|
||
// Start dependencies | ||
etcdSvc := e2edb.NewETCD() | ||
consulSvc := e2edb.NewConsul() | ||
// Get list of keys and sort them | ||
keys, err := client.List(context.Background(), "") | ||
require.NoError(t, err, "could not list keys") | ||
sort.Strings(keys) | ||
require.Equal(t, keysToCreate, keys, "returned key paths did not match created paths") | ||
|
||
require.NoError(t, s.StartAndWaitReady(etcdSvc, consulSvc)) | ||
verifyClientMetrics(t, reg, map[string]uint64{ | ||
"List": 1, | ||
"CAS": 3, | ||
}) | ||
}) | ||
} | ||
|
||
reg := prometheus.NewRegistry() | ||
func TestKVDelete(t *testing.T) { | ||
testKVs(t, func(t *testing.T, client kv.Client, reg *prometheus.Registry) { | ||
// Create a key | ||
err := client.CAS(context.Background(), "key-to-delete", func(in interface{}) (out interface{}, retry bool, err error) { | ||
return "key-to-delete", false, nil | ||
}) | ||
require.NoError(t, err, "object could not be created") | ||
|
||
// Now delete it | ||
err = client.Delete(context.Background(), "key-to-delete") | ||
require.NoError(t, err) | ||
|
||
// Get it back | ||
v, err := client.Get(context.Background(), "key-to-delete") | ||
require.NoError(t, err, "unexpected error") | ||
require.Nil(t, v, "object was not deleted") | ||
|
||
verifyClientMetrics(t, reg, map[string]uint64{ | ||
"Delete": 1, | ||
"CAS": 1, | ||
"GET": 1, | ||
}) | ||
}) | ||
} | ||
|
||
func TestKVWatchAndDelete(t *testing.T) { | ||
testKVs(t, func(t *testing.T, client kv.Client, reg *prometheus.Registry) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
err := client.CAS(context.Background(), "key-before-watch", func(in interface{}) (out interface{}, retry bool, err error) { | ||
return "value-before-watch", false, nil | ||
}) | ||
require.NoError(t, err) | ||
|
||
w := &watcher{} | ||
wg := &sync.WaitGroup{} | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
w.watch(ctx, client) | ||
}() | ||
|
||
err = client.CAS(context.Background(), "key-to-delete", func(in interface{}) (out interface{}, retry bool, err error) { | ||
return "value-to-delete", false, nil | ||
}) | ||
require.NoError(t, err, "object could not be created") | ||
|
||
// Give watcher time to receive notification. | ||
time.Sleep(500 * time.Millisecond) | ||
|
||
// Now delete it | ||
err = client.Delete(context.Background(), "key-to-delete") | ||
require.NoError(t, err) | ||
|
||
// Give watcher time to receive notification for delete, if any. | ||
time.Sleep(500 * time.Millisecond) | ||
|
||
// Stop the watcher | ||
cancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we cancel immediately there's no guarantee the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, and it's fixed in #3809, but let's add a wait here. |
||
wg.Wait() | ||
|
||
// Consul reports: | ||
// map[key-before-watch:[value-before-watch] key-to-delete:[value-to-delete]] | ||
// | ||
// Etcd reports: | ||
// map[key-to-delete:[value-to-delete ""]] | ||
t.Log(w.values) | ||
}) | ||
} | ||
|
||
func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client { | ||
etcdSvc := e2edb.NewETCD() | ||
require.NoError(t, scenario.StartAndWaitReady(etcdSvc)) | ||
|
||
etcdKv, err := kv.NewClient(kv.Config{ | ||
Store: "etcd", | ||
|
@@ -46,6 +134,13 @@ func TestKV_List_Delete(t *testing.T) { | |
}, stringCodec{}, reg) | ||
require.NoError(t, err) | ||
|
||
return etcdKv | ||
} | ||
|
||
func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client { | ||
consulSvc := e2edb.NewConsul() | ||
require.NoError(t, scenario.StartAndWaitReady(consulSvc)) | ||
|
||
consulKv, err := kv.NewClient(kv.Config{ | ||
Store: "consul", | ||
Prefix: "keys/", | ||
|
@@ -60,58 +155,39 @@ func TestKV_List_Delete(t *testing.T) { | |
}, stringCodec{}, reg) | ||
require.NoError(t, err) | ||
|
||
kvs := []struct { | ||
name string | ||
kv kv.Client | ||
}{ | ||
{"etcd", etcdKv}, | ||
{"consul", consulKv}, | ||
} | ||
return consulKv | ||
} | ||
|
||
for _, kv := range kvs { | ||
t.Run(kv.name+"_list", func(t *testing.T) { | ||
// Create keys to list back | ||
keysToCreate := []string{"key-a", "key-b", "key-c"} | ||
for _, key := range keysToCreate { | ||
err := kv.kv.CAS(context.Background(), key, func(in interface{}) (out interface{}, retry bool, err error) { | ||
return key, false, nil | ||
}) | ||
require.NoError(t, err, "could not create key") | ||
} | ||
func testKVs(t *testing.T, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) { | ||
setupFns := map[string]func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client{ | ||
"etcd": setupEtcd, | ||
"consul": setupConsul, | ||
} | ||
|
||
// Get list of keys and sort them | ||
keys, err := kv.kv.List(context.Background(), "") | ||
require.NoError(t, err, "could not list keys") | ||
sort.Strings(keys) | ||
require.Equal(t, keysToCreate, keys, "returned key paths did not match created paths") | ||
for name, setupFn := range setupFns { | ||
t.Run(name, func(t *testing.T) { | ||
testKVScenario(t, setupFn, testFn) | ||
}) | ||
} | ||
} | ||
|
||
t.Run(kv.name+"_delete", func(t *testing.T) { | ||
// Create a key | ||
err = kv.kv.CAS(context.Background(), "key-to-delete", func(in interface{}) (out interface{}, retry bool, err error) { | ||
return "key-to-delete", false, nil | ||
}) | ||
require.NoError(t, err, "object could not be created") | ||
|
||
// Now delete it | ||
err = kv.kv.Delete(context.Background(), "key-to-delete") | ||
require.NoError(t, err) | ||
func testKVScenario(t *testing.T, kvSetupFn func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) { | ||
s, err := e2e.NewScenario(networkName) | ||
require.NoError(t, err) | ||
defer s.Close() | ||
|
||
// Get it back | ||
v, err := kv.kv.Get(context.Background(), "key-to-delete") | ||
require.NoError(t, err, "unexpected error") | ||
require.Nil(t, v, "object was not deleted") | ||
}) | ||
} | ||
reg := prometheus.NewRegistry() | ||
client := kvSetupFn(t, s, reg) | ||
testFn(t, client, reg) | ||
} | ||
|
||
// Ensure the proper histogram metrics are reported | ||
func verifyClientMetrics(t *testing.T, reg *prometheus.Registry, sampleCounts map[string]uint64) { | ||
metrics, err := reg.Gather() | ||
require.NoError(t, err) | ||
|
||
require.Len(t, metrics, 1) | ||
require.Equal(t, "cortex_kv_request_duration_seconds", metrics[0].GetName()) | ||
require.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType()) | ||
require.Len(t, metrics[0].GetMetric(), 8) | ||
|
||
getMetricOperation := func(labels []*dto.LabelPair) (string, error) { | ||
for _, l := range labels { | ||
|
@@ -124,12 +200,9 @@ func TestKV_List_Delete(t *testing.T) { | |
|
||
for _, metric := range metrics[0].GetMetric() { | ||
op, err := getMetricOperation(metric.Label) | ||
|
||
require.NoErrorf(t, err, "No operation label found in metric %v", metric.String()) | ||
if op == "CAS" { | ||
require.Equal(t, uint64(4), metric.GetHistogram().GetSampleCount()) | ||
} else { | ||
require.Equal(t, uint64(1), metric.GetHistogram().GetSampleCount()) | ||
} | ||
assert.Equal(t, sampleCounts[op], metric.GetHistogram().GetSampleCount(), op) | ||
} | ||
} | ||
|
||
|
@@ -138,3 +211,15 @@ type stringCodec struct{} | |
func (c stringCodec) Decode(bb []byte) (interface{}, error) { return string(bb), nil } | ||
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil } | ||
func (c stringCodec) CodecID() string { return "stringCodec" } | ||
|
||
type watcher struct { | ||
values map[string][]interface{} | ||
} | ||
|
||
func (w *watcher) watch(ctx context.Context, client kv.Client) { | ||
w.values = map[string][]interface{}{} | ||
client.WatchPrefix(ctx, "", func(key string, value interface{}) bool { | ||
w.values[key] = append(w.values[key], value) | ||
return true | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a sleep between creating the
key-to-delete
and deleting it? I'm wondering what happens if the two operations happens very close each other: the watcher may not get any notification (depending on how the backend is implemented).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's do that.