Skip to content

Commit

Permalink
receive: fix taking lock
Browse files Browse the repository at this point in the history
Go through the readyStore adapter so that pruning can happen properly
while we are querying label values.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Aug 27, 2024
1 parent e0221e6 commit 771f433
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 39 deletions.
45 changes: 43 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -34,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/filter"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/thanos-io/objstore"

Expand Down Expand Up @@ -301,10 +303,49 @@ func (t *tenant) shipper() *shipper.Shipper {
return t.ship
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
func (t *tenant) startMetricNameFilter(tenantTSDB *tsdb.DB, storeTSDB *store.TSDBStore, logger log.Logger) {
if tenantTSDB == nil {
return
}
tc := time.NewTicker(15 * time.Second)
ctx, cancel := context.WithCancel(context.Background())

storeTSDB.SetClose(cancel)
updateMetricNames := func() {
qr, err := t.readyStorage().Querier(0, math.MaxInt64)
if err != nil {
level.Error(logger).Log("msg", "failed to update metric names", "err", err)
return
}
defer runutil.CloseWithLogOnErr(logger, qr, "querier for metric names")
lvs, _, err := qr.LabelValues(ctx, model.MetricNameLabel)
if err != nil {
level.Error(logger).Log("msg", "failed to update metric names", "err", err)
return
}

storeTSDB.MetricNameFilter.ResetAddMetricName(lvs...)
}
updateMetricNames()

go func() {
for {
select {
case <-tc.C:
updateMetricNames()
case <-ctx.Done():
return
}
}
}()
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, logger log.Logger) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB)
t.startMetricNameFilter(tenantTSDB, storeTSDB, logger)

t.mtx.Unlock()
}

Expand Down Expand Up @@ -736,7 +777,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
shipper.DefaultMetaFilename,
)
}
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset))
tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset), logger)
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
36 changes: 4 additions & 32 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"google.golang.org/grpc"
Expand Down Expand Up @@ -54,6 +51,10 @@ type TSDBStore struct {
close func()
}

func (s *TSDBStore) SetClose(c func()) {
s.close = c
}

func (s *TSDBStore) Close() {
s.close()
}
Expand Down Expand Up @@ -91,35 +92,6 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
MetricNameFilter: filter.NewCuckooFilterMetricNameFilter(1000000),
}

t := time.NewTicker(15 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
updateMetricNames := func() {
vals, err := st.LabelValues(context.Background(), &storepb.LabelValuesRequest{
Label: model.MetricNameLabel,
Start: 0,
End: math.MaxInt64,
})
if err != nil {
level.Error(logger).Log("msg", "failed to update metric names", "err", err)
return
}

st.MetricNameFilter.ResetAddMetricName(vals.Values...)
}
st.close = cancel
updateMetricNames()

go func() {
for {
select {
case <-t.C:
updateMetricNames()
case <-ctx.Done():
return
}
}
}()

return st
}

Expand Down
30 changes: 25 additions & 5 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,11 +1676,31 @@ func queryAndAssert(t *testing.T, ctx context.Context, addr string, q func() str
t.Helper()

sortResults(expected)
result := instantQuery(t, ctx, addr, q, ts, opts, len(expected))
for _, r := range result {
r.Timestamp = 0 // Does not matter for us.
}
testutil.Equals(t, expected, result)
logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
_ = logger.Log(
"caller", "queryAndAssert",
"query", q(),
"msg", "checking if the query returns expected results",
"expected", expected.String(),
)

runutil.RetryWithLog(logger, 10*time.Second, ctx.Done(), func() error {

Check failure on line 1688 in test/e2e/query_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

Error return value of `runutil.RetryWithLog` is not checked (errcheck)
result := instantQuery(t, ctx, addr, q, ts, opts, len(expected))
for _, r := range result {
r.Timestamp = 0 // Does not matter for us.
}
testutil.Equals(t, expected, result)
if len(expected) != len(result) {
return fmt.Errorf("mismatching result length: %d != %d", len(expected), len(result))
}
for i := range result {
if !result[i].Equal(expected[i]) {
return fmt.Errorf("mismatching result at %d: %s != %s", i, result[i], expected[i])
}
}
return nil
})
}

func labelNames(t *testing.T, ctx context.Context, addr string, matchers []*labels.Matcher, start, end int64, check func(res []string) bool) {
Expand Down

0 comments on commit 771f433

Please sign in to comment.