Skip to content

Commit ea1a045

Browse files
pstibranypracucci
authored andcommitted
TSDB: add bucket store metrics in querier (#1996)
* Moved MetricFamiliesPerUser type to util package. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added prometheus registry to block stores. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Expose first set of metrics from TSDB bucket store. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Support for summaries with labels. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Send data directly to channel to avoid allocating extra slices/maps just to return results. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added more summaries. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added remaining metrics from TSDB Bucket Store. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Extracted TSDB bucket store metrics into separate type Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added test for bucket_store_metrics_test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added test for bucket_store_metrics_test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Gather and report metrics from Thanos' storecache.InMemoryIndexCache. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Extracted common code that builds MetricFamiliesPerUser Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added benchmarks We may want to optimize this in the future. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Updated CHANGELOG.md Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added tests to sum and getMetricsWithLabelNames functions. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fixes. Fixed sum for nil input. Extracted common code for sum of counters/gauges with labels. Fixed gauge output (was reported as counter) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Replaced cortex_bucket_store prefix with cortex_querier_bucket_store. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed uninteresting cortex_querier_bucket_store_sent_chunk_size_bytes metric. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Replaced cortex_store_index_cache prefix with cortex_querier_blocks_index_cache Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Group metrics registration, and register only into non-nil registry. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Make message generic. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore result and error and make lint happy. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added test for getMetricsWithLabelNames with no labels. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Comment about missing m1 in test output. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fixed duplicate entry in CHANGELOG.md Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single call. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Typo Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 355dff6 commit ea1a045

File tree

9 files changed

+1101
-118
lines changed

9 files changed

+1101
-118
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ instructions below to upgrade your Postgres.
3333
* [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917
3434
* `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup`
3535
* [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983
36+
* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996
3637
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
3738
* [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921
3839
* [BUGFIX] Reduce memory usage when ingester Push() errors. #1922

pkg/ingester/metrics.go

Lines changed: 7 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"sync"
55

66
"github.com/cortexproject/cortex/pkg/util"
7-
"github.com/go-kit/kit/log/level"
87
"github.com/prometheus/client_golang/prometheus"
9-
dto "github.com/prometheus/client_model/go"
108
)
119

1210
const (
@@ -122,10 +120,6 @@ type tsdbMetrics struct {
122120
memSeriesCreatedTotal *prometheus.Desc
123121
memSeriesRemovedTotal *prometheus.Desc
124122

125-
// These maps drive the collection output. Key = original metric name to group.
126-
sumCountersGlobally map[string]*prometheus.Desc
127-
sumCountersPerUser map[string]*prometheus.Desc
128-
129123
regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
130124
regs map[string]*prometheus.Registry // One prometheus registry per tenant
131125
}
@@ -155,18 +149,6 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
155149
memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil),
156150
}
157151

158-
m.sumCountersGlobally = map[string]*prometheus.Desc{
159-
"thanos_shipper_dir_syncs_total": m.dirSyncs,
160-
"thanos_shipper_dir_sync_failures_total": m.dirSyncFailures,
161-
"thanos_shipper_uploads_total": m.uploads,
162-
"thanos_shipper_upload_failures_total": m.uploadFailures,
163-
}
164-
165-
m.sumCountersPerUser = map[string]*prometheus.Desc{
166-
"prometheus_tsdb_head_series_created_total": m.memSeriesCreatedTotal,
167-
"prometheus_tsdb_head_series_removed_total": m.memSeriesRemovedTotal,
168-
}
169-
170152
if r != nil {
171153
r.MustRegister(m)
172154
}
@@ -183,30 +165,16 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
183165
}
184166

185167
func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
186-
regs := sm.registries()
187-
data := gatheredMetricsPerUser{}
188-
189-
for userID, r := range regs {
190-
m, err := r.Gather()
191-
if err != nil {
192-
level.Warn(util.Logger).Log("msg", "failed to gather metrics from TSDB shipper", "user", userID, "err", err)
193-
continue
194-
}
195-
196-
data.addGatheredDataForUser(userID, m)
197-
}
168+
data := util.BuildMetricFamiliesPerUserFromUserRegistries(sm.registries())
198169

199170
// OK, we have it all. Let's build results.
200-
for metric, desc := range sm.sumCountersGlobally {
201-
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, data.sumCountersAcrossAllUsers(metric))
202-
}
171+
data.SendSumOfCounters(out, sm.dirSyncs, "thanos_shipper_dir_syncs_total")
172+
data.SendSumOfCounters(out, sm.dirSyncFailures, "thanos_shipper_dir_sync_failures_total")
173+
data.SendSumOfCounters(out, sm.uploads, "thanos_shipper_uploads_total")
174+
data.SendSumOfCounters(out, sm.uploadFailures, "thanos_shipper_upload_failures_total")
203175

204-
for metric, desc := range sm.sumCountersPerUser {
205-
userValues := data.sumCountersPerUser(metric)
206-
for user, val := range userValues {
207-
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, val, user)
208-
}
209-
}
176+
data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total")
177+
data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total")
210178
}
211179

212180
// make a copy of the map, so that metrics can be gathered while the new registry is being added.
@@ -226,56 +194,3 @@ func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Re
226194
sm.regs[userID] = registry
227195
sm.regsMu.Unlock()
228196
}
229-
230-
func sumCounters(mfs []*dto.MetricFamily) float64 {
231-
result := float64(0)
232-
for _, mf := range mfs {
233-
if mf.Type == nil || *mf.Type != dto.MetricType_COUNTER {
234-
continue
235-
}
236-
237-
for _, m := range mf.Metric {
238-
if m == nil || m.Counter == nil || m.Counter.Value == nil {
239-
continue
240-
}
241-
242-
result += *m.Counter.Value
243-
}
244-
}
245-
return result
246-
}
247-
248-
// first key = userID, second key = metric name. Value = slice of gathered values with the same metric name.
249-
type gatheredMetricsPerUser map[string]map[string][]*dto.MetricFamily
250-
251-
func (d gatheredMetricsPerUser) addGatheredDataForUser(userID string, metrics []*dto.MetricFamily) {
252-
// first, create new map which maps metric names to a slice of MetricFamily instances.
253-
// That makes it easier to do searches later.
254-
perMetricName := map[string][]*dto.MetricFamily{}
255-
256-
for _, m := range metrics {
257-
if m.Name == nil {
258-
continue
259-
}
260-
perMetricName[*m.Name] = append(perMetricName[*m.Name], m)
261-
}
262-
263-
d[userID] = perMetricName
264-
}
265-
266-
func (d gatheredMetricsPerUser) sumCountersAcrossAllUsers(counter string) float64 {
267-
result := float64(0)
268-
for _, perMetric := range d {
269-
result += sumCounters(perMetric[counter])
270-
}
271-
return result
272-
}
273-
274-
func (d gatheredMetricsPerUser) sumCountersPerUser(counter string) map[string]float64 {
275-
result := map[string]float64{}
276-
for user, perMetric := range d {
277-
v := sumCounters(perMetric[counter])
278-
result[user] = v
279-
}
280-
return result
281-
}

pkg/ingester/metrics_test.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,14 @@ import (
1010
)
1111

1212
func TestTSDBMetrics(t *testing.T) {
13-
mainReg := prometheus.NewRegistry()
13+
mainReg := prometheus.NewPedanticRegistry()
1414

1515
tsdbMetrics := newTSDBMetrics(mainReg)
1616

1717
tsdbMetrics.setRegistryForUser("user1", populateTSDBMetrics(12345))
1818
tsdbMetrics.setRegistryForUser("user2", populateTSDBMetrics(85787))
1919
tsdbMetrics.setRegistryForUser("user3", populateTSDBMetrics(999))
2020

21-
metricNames := []string{
22-
"cortex_ingester_shipper_dir_syncs_total",
23-
"cortex_ingester_shipper_dir_sync_failures_total",
24-
"cortex_ingester_shipper_uploads_total",
25-
"cortex_ingester_shipper_upload_failures_total",
26-
"cortex_ingester_memory_series_created_total",
27-
"cortex_ingester_memory_series_removed_total",
28-
}
29-
3021
err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
3122
# HELP cortex_ingester_shipper_dir_syncs_total TSDB: Total dir sync attempts
3223
# TYPE cortex_ingester_shipper_dir_syncs_total counter
@@ -61,7 +52,7 @@ func TestTSDBMetrics(t *testing.T) {
6152
cortex_ingester_memory_series_removed_total{user="user1"} 74070
6253
cortex_ingester_memory_series_removed_total{user="user2"} 514722
6354
cortex_ingester_memory_series_removed_total{user="user3"} 5994
64-
`), metricNames...)
55+
`))
6556
require.NoError(t, err)
6657
}
6758

pkg/querier/block.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Regis
3939
}),
4040
}
4141

42-
r.MustRegister(b.syncTimes)
43-
4442
us, err := NewUserStore(cfg, logLevel, util.Logger)
4543
if err != nil {
4644
return nil, err
4745
}
4846
b.us = us
4947

48+
if r != nil {
49+
r.MustRegister(b.syncTimes, us.tsdbMetrics)
50+
}
51+
5052
level.Info(util.Logger).Log("msg", "synchronizing TSDB blocks for all users")
5153
if err := us.InitialSync(context.Background()); err != nil {
5254
level.Warn(util.Logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)

pkg/querier/block_store.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1313
"github.com/go-kit/kit/log"
1414
"github.com/go-kit/kit/log/level"
15+
"github.com/prometheus/client_golang/prometheus"
1516
"github.com/thanos-io/thanos/pkg/model"
1617
"github.com/thanos-io/thanos/pkg/objstore"
1718
"github.com/thanos-io/thanos/pkg/store"
@@ -24,12 +25,13 @@ import (
2425

2526
// UserStore is a multi-tenant version of Thanos BucketStore
2627
type UserStore struct {
27-
logger log.Logger
28-
cfg tsdb.Config
29-
bucket objstore.BucketReader
30-
stores map[string]*store.BucketStore
31-
client storepb.StoreClient
32-
logLevel logging.Level
28+
logger log.Logger
29+
cfg tsdb.Config
30+
bucket objstore.BucketReader
31+
stores map[string]*store.BucketStore
32+
client storepb.StoreClient
33+
logLevel logging.Level
34+
tsdbMetrics *tsdbBucketStoreMetrics
3335
}
3436

3537
// NewUserStore returns a new UserStore
@@ -40,11 +42,12 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (*
4042
}
4143

4244
u := &UserStore{
43-
logger: logger,
44-
cfg: cfg,
45-
bucket: bkt,
46-
stores: make(map[string]*store.BucketStore),
47-
logLevel: logLevel,
45+
logger: logger,
46+
cfg: cfg,
47+
bucket: bkt,
48+
stores: map[string]*store.BucketStore{},
49+
logLevel: logLevel,
50+
tsdbMetrics: newTSDBBucketStoreMetrics(),
4851
}
4952

5053
serv := grpc.NewServer()
@@ -115,9 +118,11 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
115118
Bucket: bkt,
116119
}
117120

121+
reg := prometheus.NewRegistry()
122+
118123
indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes
119124
maxItemSizeBytes := indexCacheSizeBytes / 2
120-
indexCache, err := storecache.NewInMemoryIndexCache(u.logger, nil, storecache.Opts{
125+
indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{
121126
MaxSizeBytes: indexCacheSizeBytes,
122127
MaxItemSizeBytes: maxItemSizeBytes,
123128
})
@@ -126,7 +131,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
126131
}
127132
bs, err = store.NewBucketStore(
128133
u.logger,
129-
nil,
134+
reg,
130135
userBkt,
131136
filepath.Join(u.cfg.BucketStore.SyncDir, user),
132137
indexCache,
@@ -147,6 +152,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
147152
}
148153

149154
u.stores[user] = bs
155+
u.tsdbMetrics.addUserRegistry(user, reg)
150156
}
151157

152158
wg.Add(1)

0 commit comments

Comments
 (0)