Skip to content

Add new metrics to aid troubleshooting tombstone convergence. #4231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
* [ENHANCEMENT] Added zone-awareness support to alertmanager for use when sharding is enabled. When zone-awareness is enabled, alerts will be replicated across availability zones. #4204
* [ENHANCEMENT] Added `tenant_ids` tag to tracing spans #4147
* [ENHANCEMENT] Ring, query-frontend: Avoid using automatic private IPs (APIPA) when discovering IP address from the interface during the registration of the instance in the ring, or by query-frontend when used with query-scheduler. APIPA still used as last resort with logging indicating usage. #4032
* [ENHANCEMENT] Memberlist: introduced new metrics to aid troubleshooting tombstone convergence: #4231
* `memberlist_client_kv_store_value_tombstones`
* `memberlist_client_kv_store_value_tombstones_removed_total`
* `memberlist_client_messages_to_broadcast_dropped_total`
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176
* [BUGFIX] Alertmanager: fix Alertmanager status page if clustering via gossip is disabled or sharding is enabled. #4184
Expand Down
8 changes: 8 additions & 0 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,23 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total"))

// All Cortex servers should initially have no tombstones; nobody has left yet.
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones"))

require.NoError(t, s.Stop(cortex1))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(1), "memberlist_client_kv_store_value_tombstones"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(1), "memberlist_client_kv_store_value_tombstones"))

require.NoError(t, s.Stop(cortex2))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(1*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(2), "memberlist_client_kv_store_value_tombstones"))

require.NoError(t, s.Stop(cortex3))
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,16 @@ type KV struct {
totalSizeOfPushes prometheus.Counter
numberOfBroadcastMessagesInQueue prometheus.GaugeFunc
totalSizeOfBroadcastMessagesInQueue prometheus.Gauge
numberOfBroadcastMessagesDropped prometheus.Counter
casAttempts prometheus.Counter
casFailures prometheus.Counter
casSuccesses prometheus.Counter
watchPrefixDroppedNotifications *prometheus.CounterVec

storeValuesDesc *prometheus.Desc
storeSizesDesc *prometheus.Desc
storeValuesDesc *prometheus.Desc
storeSizesDesc *prometheus.Desc
storeTombstones *prometheus.GaugeVec
storeRemovedTombstones *prometheus.CounterVec

memberlistMembersCount prometheus.GaugeFunc
memberlistHealthScore prometheus.GaugeFunc
Expand Down Expand Up @@ -625,7 +628,7 @@ func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint,
if mr, ok := out.(Mergeable); ok {
// remove ALL tombstones before returning to client.
// No need for clients to see them.
mr.RemoveTombstones(time.Time{})
_, _ = mr.RemoveTombstones(time.Time{})
}
}

Expand Down Expand Up @@ -883,14 +886,16 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{})
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) {
data, err := codec.Encode(change)
if err != nil {
level.Error(m.logger).Log("msg", "failed to encode change", "err", err)
level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err)
m.numberOfBroadcastMessagesDropped.Inc()
return
}

kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID()}
pairData, err := kvPair.Marshal()
if err != nil {
level.Error(m.logger).Log("msg", "failed to serialize KV pair", "err", err)
level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err)
m.numberOfBroadcastMessagesDropped.Inc()
return
}

Expand All @@ -901,7 +906,8 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
//
// Typically messages are smaller (when dealing with couple of updates only), but can get bigger
// when broadcasting result of push/pull update.
level.Debug(m.logger).Log("msg", "broadcast message too big, not broadcasting", "len", len(pairData))
level.Debug(m.logger).Log("msg", "broadcast message too big, not broadcasting", "key", key, "version", version, "len", len(pairData))
m.numberOfBroadcastMessagesDropped.Inc()
return
}

Expand Down Expand Up @@ -1191,7 +1197,9 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui

if m.cfg.LeftIngestersTimeout > 0 {
limit := time.Now().Add(-m.cfg.LeftIngestersTimeout)
result.RemoveTombstones(limit)
total, removed := result.RemoveTombstones(limit)
m.storeTombstones.WithLabelValues(key).Set(float64(total))
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))
}

encoded, err := codec.Encode(result)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func (d *data) MergeContent() []string {
return out
}

func (d *data) RemoveTombstones(limit time.Time) {
func (d *data) RemoveTombstones(limit time.Time) (_, _ int) {
// nothing to do
return
}

func (d *data) getAllTokens() []uint32 {
Expand Down Expand Up @@ -870,8 +871,9 @@ func (dc distributedCounter) MergeContent() []string {
return out
}

func (dc distributedCounter) RemoveTombstones(limit time.Time) {
func (dc distributedCounter) RemoveTombstones(limit time.Time) (_, _ int) {
// nothing to do
return
}

type distributedCounterCodec struct{}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ring/kv/memberlist/mergeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ type Mergeable interface {
// Remove tombstones older than given limit from this mergeable.
// If limit is zero time, remove all tombstones. Memberlist client calls this method with zero limit each
// time when client is accessing value from the store. It can be used to hide tombstones from the clients.
RemoveTombstones(limit time.Time)
// Returns the total number of tombstones present and the number of removed tombstones by this invocation.
RemoveTombstones(limit time.Time) (total, removed int)
}
24 changes: 24 additions & 0 deletions pkg/ring/kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Total size of messages waiting in the broadcast queue",
})

m.numberOfBroadcastMessagesDropped = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "messages_to_broadcast_dropped_total",
Help: "Number of broadcast messages intended to be sent but were dropped due to encoding errors or for being too big",
})

m.casAttempts = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -115,6 +122,20 @@ func (m *KV) createAndRegisterMetrics() {
"Sizes of values in KV Store in bytes",
[]string{"key"}, nil)

m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "kv_store_value_tombstones",
Help: "Number of tombstones currently present in KV store values",
}, []string{"key"})

m.storeRemovedTombstones = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "kv_store_value_tombstones_removed_total",
Help: "Total number of tombstones which have been removed from KV store values",
}, []string{"key"})

m.memberlistMembersCount = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -162,10 +183,13 @@ func (m *KV) createAndRegisterMetrics() {
m.totalSizeOfPushes,
m.totalSizeOfPulls,
m.totalSizeOfBroadcastMessagesInQueue,
m.numberOfBroadcastMessagesDropped,
m.casAttempts,
m.casFailures,
m.casSuccesses,
m.watchPrefixDroppedNotifications,
m.storeTombstones,
m.storeRemovedTombstones,
m.memberlistMembersCount,
m.memberlistHealthScore,
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,19 @@ func resolveConflicts(normalizedIngesters map[string]InstanceDesc) {
}

// RemoveTombstones removes LEFT ingesters older than given time limit. If time limit is zero, remove all LEFT ingesters.
func (d *Desc) RemoveTombstones(limit time.Time) {
removed := 0
func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int) {
for n, ing := range d.Ingesters {
if ing.State == LEFT && (limit.IsZero() || time.Unix(ing.Timestamp, 0).Before(limit)) {
// remove it
delete(d.Ingesters, n)
removed++
if ing.State == LEFT {
if limit.IsZero() || time.Unix(ing.Timestamp, 0).Before(limit) {
// remove it
delete(d.Ingesters, n)
removed++
} else {
total++
}
}
}
return
}

func (d *Desc) getTokensInfo() map[uint32]instanceInfo {
Expand Down