Skip to content

TSDB: fix user metrics when using blocks storage #1990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ instructions below to upgrade your Postgres.
* [BUGFIX] TSDB: `experimental.tsdb.ship-interval` of <=0 treated as disabled instead of allowing panic. #1975
* [BUGFIX] TSDB: Fixed `cortex_ingester_queried_samples` and `cortex_ingester_queried_series` metrics when using block storage. #1981
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series` and `cortex_ingester_memory_users` metrics when using with the experimental TSDB blocks storage. #1982
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series_created_total` and `cortex_ingester_memory_series_removed_total` metrics when using TSDB blocks storage. #1990

### Upgrading Postgres (if you're using configs service)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer),
metrics: newIngesterMetrics(registerer, true),
limits: limits,
chunkStore: chunkStore,
quit: make(chan struct{}),
Expand Down
17 changes: 10 additions & 7 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type TSDBState struct {
// transferring to a joining ingester
transferOnce sync.Once

shipperMetrics *shipperMetrics
tsdbMetrics *tsdbMetrics
}

// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
Expand All @@ -59,15 +59,15 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer),
metrics: newIngesterMetrics(registerer, false),
limits: limits,
chunkStore: nil,
quit: make(chan struct{}),

TSDBState: TSDBState{
dbs: make(map[string]*tsdb.DB),
bucket: bucketClient,
shipperMetrics: newShipperMetrics(registerer),
dbs: make(map[string]*tsdb.DB),
bucket: bucketClient,
tsdbMetrics: newTSDBMetrics(registerer),
},
}

Expand Down Expand Up @@ -419,10 +419,12 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
tsdbPromReg := prometheus.NewRegistry()

udir := i.cfg.TSDBConfig.BlocksDir(userID)

// Create a new user database
db, err := tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
db, err := tsdb.Open(udir, util.Logger, tsdbPromReg, &tsdb.Options{
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
NoLockfile: true,
Expand All @@ -443,7 +445,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {

// Create a new shipper for this database
if i.cfg.TSDBConfig.ShipInterval > 0 {
s := shipper.New(util.Logger, i.TSDBState.shipperMetrics.newRegistryForUser(userID), udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource)
s := shipper.New(util.Logger, tsdbPromReg, udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource)
i.done.Add(1)
go func() {
defer i.done.Done()
Expand All @@ -458,6 +460,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
}()
}

i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)
return db, nil
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestIngester_v2Push(t *testing.T) {
"cortex_ingester_ingested_samples_failures_total",
"cortex_ingester_memory_series",
"cortex_ingester_memory_users",
"cortex_ingester_memory_series_created_total",
"cortex_ingester_memory_series_removed_total",
}
userID := "test"

Expand Down Expand Up @@ -73,6 +75,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
`,
},
"should soft fail on sample out of order": {
Expand Down Expand Up @@ -103,6 +111,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
`,
},
"should soft fail on sample out of bound": {
Expand Down Expand Up @@ -133,6 +147,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
`,
},
"should soft fail on two different sample values at the same timestamp": {
Expand Down Expand Up @@ -163,6 +183,12 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 1
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
`,
},
}
Expand Down Expand Up @@ -226,6 +252,8 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
"cortex_ingester_ingested_samples_failures_total",
"cortex_ingester_memory_series",
"cortex_ingester_memory_users",
"cortex_ingester_memory_series_created_total",
"cortex_ingester_memory_series_removed_total",
}

registry := prometheus.NewRegistry()
Expand Down Expand Up @@ -278,6 +306,14 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
# HELP cortex_ingester_memory_series The current number of series in memory.
# TYPE cortex_ingester_memory_series gauge
cortex_ingester_memory_series 2
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="test-1"} 1
cortex_ingester_memory_series_created_total{user="test-2"} 1
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test-1"} 0
cortex_ingester_memory_series_removed_total{user="test-2"} 0
`

assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
Expand Down
136 changes: 103 additions & 33 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
dto "github.com/prometheus/client_model/go"
)

const (
memSeriesCreatedTotalName = "cortex_ingester_memory_series_created_total"
memSeriesCreatedTotalHelp = "The total number of series that were created per user."

memSeriesRemovedTotalName = "cortex_ingester_memory_series_removed_total"
memSeriesRemovedTotalHelp = "The total number of series that were removed per user."
)

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
ingestedSamples prometheus.Counter
Expand All @@ -23,7 +31,7 @@ type ingesterMetrics struct {
memSeriesRemovedTotal *prometheus.CounterVec
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *ingesterMetrics {
m := &ingesterMetrics{
flushQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_flush_queue_length",
Expand Down Expand Up @@ -68,12 +76,12 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Help: "The current number of users in memory.",
}),
memSeriesCreatedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_memory_series_created_total",
Help: "The total number of series that were created per user.",
Name: memSeriesCreatedTotalName,
Help: memSeriesCreatedTotalHelp,
}, []string{"user"}),
memSeriesRemovedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_memory_series_removed_total",
Help: "The total number of series that were removed per user.",
Name: memSeriesRemovedTotalName,
Help: memSeriesRemovedTotalHelp,
}, []string{"user"}),
}

Expand All @@ -88,28 +96,42 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
m.queriedChunks,
m.memSeries,
m.memUsers,
m.memSeriesCreatedTotal,
m.memSeriesRemovedTotal,
)

if registerMetricsConflictingWithTSDB {
r.MustRegister(
m.memSeriesCreatedTotal,
m.memSeriesRemovedTotal,
)
}
}

return m
}

// TSDB shipper metrics. We aggregate metrics from individual TSDB shippers into
// a single set of counters, which are exposed as Cortex metrics.
type shipperMetrics struct {
// TSDB metrics. Each tenant has its own registry, that TSDB code uses.
type tsdbMetrics struct {
// We aggregate metrics from individual TSDB registries into
// a single set of counters, which are exposed as Cortex metrics.
dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total)
dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total)
uploads *prometheus.Desc // sum(thanos_shipper_uploads_total)
uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total)

// These two metrics replace metrics in ingesterMetrics, as we count them differently
memSeriesCreatedTotal *prometheus.Desc
memSeriesRemovedTotal *prometheus.Desc

// These maps drive the collection output. Key = original metric name to group.
sumCountersGlobally map[string]*prometheus.Desc
sumCountersPerUser map[string]*prometheus.Desc

regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
regs map[string]*prometheus.Registry // One prometheus registry (used by shipper) per tenant
regs map[string]*prometheus.Registry // One prometheus registry per tenant
}

func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
m := &shipperMetrics{
func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
m := &tsdbMetrics{
regs: make(map[string]*prometheus.Registry),

dirSyncs: prometheus.NewDesc(
Expand All @@ -128,6 +150,21 @@ func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
"cortex_ingester_shipper_upload_failures_total",
"TSDB: Total number of failed object uploads",
nil, nil),

memSeriesCreatedTotal: prometheus.NewDesc(memSeriesCreatedTotalName, memSeriesCreatedTotalHelp, []string{"user"}, nil),
memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil),
}

m.sumCountersGlobally = map[string]*prometheus.Desc{
"thanos_shipper_dir_syncs_total": m.dirSyncs,
"thanos_shipper_dir_sync_failures_total": m.dirSyncFailures,
"thanos_shipper_uploads_total": m.uploads,
"thanos_shipper_upload_failures_total": m.uploadFailures,
}

m.sumCountersPerUser = map[string]*prometheus.Desc{
"prometheus_tsdb_head_series_created_total": m.memSeriesCreatedTotal,
"prometheus_tsdb_head_series_removed_total": m.memSeriesRemovedTotal,
}

if r != nil {
Expand All @@ -136,51 +173,58 @@ func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
return m
}

func (sm *shipperMetrics) Describe(out chan<- *prometheus.Desc) {
func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
out <- sm.dirSyncs
out <- sm.dirSyncFailures
out <- sm.uploads
out <- sm.uploadFailures
out <- sm.memSeriesCreatedTotal
out <- sm.memSeriesRemovedTotal
}

func (sm *shipperMetrics) Collect(out chan<- prometheus.Metric) {
gathered := make(map[string][]*dto.MetricFamily)
func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
regs := sm.registries()
data := gatheredMetricsPerUser{}

regs := sm.shipperRegistries()
for userID, r := range regs {
m, err := r.Gather()
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to gather metrics from TSDB shipper", "user", userID, "err", err)
continue
}

addToGatheredMap(gathered, m)
data.addGatheredDataForUser(userID, m)
}

// OK, we have it all. Let's build results.
out <- prometheus.MustNewConstMetric(sm.dirSyncs, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_dir_syncs_total"]))
out <- prometheus.MustNewConstMetric(sm.dirSyncFailures, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_dir_sync_failures_total"]))
out <- prometheus.MustNewConstMetric(sm.uploads, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_uploads_total"]))
out <- prometheus.MustNewConstMetric(sm.uploadFailures, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_upload_failures_total"]))
for metric, desc := range sm.sumCountersGlobally {
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, data.sumCountersAcrossAllUsers(metric))
}

for metric, desc := range sm.sumCountersPerUser {
userValues := data.sumCountersPerUser(metric)
for user, val := range userValues {
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, val, user)
}
}
}

func (sm *shipperMetrics) shipperRegistries() []*prometheus.Registry {
// make a copy of the map, so that metrics can be gathered while the new registry is being added.
func (sm *tsdbMetrics) registries() map[string]*prometheus.Registry {
sm.regsMu.RLock()
defer sm.regsMu.RUnlock()

regs := make([]*prometheus.Registry, 0, len(sm.regs))
for _, r := range sm.regs {
regs = append(regs, r)
regs := make(map[string]*prometheus.Registry, len(sm.regs))
for u, r := range sm.regs {
regs[u] = r
}
return regs
}

func (sm *shipperMetrics) newRegistryForUser(userID string) prometheus.Registerer {
reg := prometheus.NewRegistry()
func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) {
sm.regsMu.Lock()
sm.regs[userID] = reg
sm.regs[userID] = registry
sm.regsMu.Unlock()
return reg
}

func sumCounters(mfs []*dto.MetricFamily) float64 {
Expand All @@ -201,11 +245,37 @@ func sumCounters(mfs []*dto.MetricFamily) float64 {
return result
}

func addToGatheredMap(all map[string][]*dto.MetricFamily, mfs []*dto.MetricFamily) {
for _, m := range mfs {
// first key = userID, second key = metric name. Value = slice of gathered values with the same metric name.
type gatheredMetricsPerUser map[string]map[string][]*dto.MetricFamily

func (d gatheredMetricsPerUser) addGatheredDataForUser(userID string, metrics []*dto.MetricFamily) {
// first, create new map which maps metric names to a slice of MetricFamily instances.
// That makes it easier to do searches later.
perMetricName := map[string][]*dto.MetricFamily{}

for _, m := range metrics {
if m.Name == nil {
continue
}
all[*m.Name] = append(all[*m.Name], m)
perMetricName[*m.Name] = append(perMetricName[*m.Name], m)
}

d[userID] = perMetricName
}

func (d gatheredMetricsPerUser) sumCountersAcrossAllUsers(counter string) float64 {
result := float64(0)
for _, perMetric := range d {
result += sumCounters(perMetric[counter])
}
return result
}

func (d gatheredMetricsPerUser) sumCountersPerUser(counter string) map[string]float64 {
result := map[string]float64{}
for user, perMetric := range d {
v := sumCounters(perMetric[counter])
result[user] = v
}
return result
}
Loading