From 771f4339ebfae739271f6cd19edcef071a91d826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 27 Aug 2024 12:32:48 +0300 Subject: [PATCH] receive: fix taking lock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Go through the readyStore adapter so that pruning can happen properly while we are querying label values. Signed-off-by: Giedrius Statkevičius --- pkg/receive/multitsdb.go | 45 ++++++++++++++++++++++++++++++++++++++-- pkg/store/tsdb.go | 36 ++++---------------------------- test/e2e/query_test.go | 30 ++++++++++++++++++++++----- 3 files changed, 72 insertions(+), 39 deletions(-) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 3251c2729c4..c855c5531b5 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "math" "os" "path" "path/filepath" @@ -34,6 +35,7 @@ import ( "github.com/thanos-io/thanos/pkg/api/status" "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/objstore" @@ -301,10 +303,49 @@ func (t *tenant) shipper() *shipper.Shipper { return t.ship } -func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { +func (t *tenant) startMetricNameFilter(tenantTSDB *tsdb.DB, storeTSDB *store.TSDBStore, logger log.Logger) { + if tenantTSDB == nil { + return + } + tc := time.NewTicker(15 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) + + storeTSDB.SetClose(cancel) + updateMetricNames := func() { + qr, err := t.readyStorage().Querier(0, math.MaxInt64) + if err != nil { + level.Error(logger).Log("msg", "failed to update metric names", "err", err) + return + } + defer runutil.CloseWithLogOnErr(logger, qr, "querier for metric names") + lvs, _, err := qr.LabelValues(ctx, model.MetricNameLabel) + if err != nil { + level.Error(logger).Log("msg", "failed to update metric names", "err", err) + return + } + + storeTSDB.MetricNameFilter.ResetAddMetricName(lvs...) + } + updateMetricNames() + + go func() { + for { + select { + case <-tc.C: + updateMetricNames() + case <-ctx.Done(): + return + } + } + }() +} + +func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, logger log.Logger) { t.readyS.Set(tenantTSDB) t.mtx.Lock() t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB) + t.startMetricNameFilter(tenantTSDB, storeTSDB, logger) + t.mtx.Unlock() } @@ -736,7 +777,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset)) + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset), logger) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 73788aa46bc..03d81ee6944 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -11,12 +11,9 @@ import ( "sort" "strings" "sync" - "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "google.golang.org/grpc" @@ -54,6 +51,10 @@ type TSDBStore struct { close func() } +func (s *TSDBStore) SetClose(c func()) { + s.close = c +} + func (s *TSDBStore) Close() { s.close() } @@ -91,35 +92,6 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI MetricNameFilter: filter.NewCuckooFilterMetricNameFilter(1000000), } - t := time.NewTicker(15 * time.Second) - ctx, cancel := context.WithCancel(context.Background()) - updateMetricNames := func() { - vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{ - Label: model.MetricNameLabel, - Start: 0, - End: math.MaxInt64, - }) - if err != nil { - level.Error(logger).Log("msg", "failed to update metric names", "err", err) - return - } - - st.MetricNameFilter.ResetAddMetricName(vals.Values...) - } - st.close = cancel - updateMetricNames() - - go func() { - for { - select { - case <-t.C: - updateMetricNames() - case <-ctx.Done(): - return - } - } - }() - return st } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 52145cd1dac..bcd4e83d497 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1676,11 +1676,31 @@ func queryAndAssert(t *testing.T, ctx context.Context, addr string, q func() str t.Helper() sortResults(expected) - result := instantQuery(t, ctx, addr, q, ts, opts, len(expected)) - for _, r := range result { - r.Timestamp = 0 // Does not matter for us. - } - testutil.Equals(t, expected, result) + logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "ts", log.DefaultTimestampUTC) + _ = logger.Log( + "caller", "queryAndAssert", + "query", q(), + "msg", "checking if the query returns expected results", + "expected", expected.String(), + ) + + runutil.RetryWithLog(logger, 10*time.Second, ctx.Done(), func() error { + result := instantQuery(t, ctx, addr, q, ts, opts, len(expected)) + for _, r := range result { + r.Timestamp = 0 // Does not matter for us. + } + testutil.Equals(t, expected, result) + if len(expected) != len(result) { + return fmt.Errorf("mismatching result length: %d != %d", len(expected), len(result)) + } + for i := range result { + if !result[i].Equal(expected[i]) { + return fmt.Errorf("mismatching result at %d: %s != %s", i, result[i], expected[i]) + } + } + return nil + }) } func labelNames(t *testing.T, ctx context.Context, addr string, matchers []*labels.Matcher, start, end int64, check func(res []string) bool) {