diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index e5a7e29c1d6..dd493621ed6 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -154,7 +155,34 @@ type tenant struct { exemplarsTSDB *exemplars.TSDB ship *shipper.Shipper - mtx *sync.RWMutex + mtx *sync.RWMutex + tsdb *tsdb.DB + + // For tests. + blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc +} + +func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { + t.mtx.RLock() + defer t.mtx.RUnlock() + + if t.tsdb == nil { + return nil + } + + deletable := t.blocksToDeleteFn(t.tsdb)(blocks) + if t.ship == nil { + return deletable + } + + uploaded := t.ship.UploadedBlocks() + for deletableID := range deletable { + if _, ok := uploaded[deletableID]; !ok { + delete(deletable, deletableID) + } + } + + return deletable } func newTenant() *tenant { @@ -202,14 +230,15 @@ func (t *tenant) shipper() *shipper.Shipper { func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { t.readyS.Set(tenantTSDB) t.mtx.Lock() - t.setComponents(storeTSDB, ship, exemplarsTSDB) + t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB) t.mtx.Unlock() } -func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) { +func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) { t.storeTSDB = storeTSDB t.ship = ship t.exemplarsTSDB = exemplarsTSDB + t.tsdb = tenantTSDB } func (t *MultiTSDB) Open() error { @@ -383,9 +412,12 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantTSDB.mtx.Lock() defer tenantTSDB.mtx.Unlock() - // Lock the entire tenant to make sure the shipper is not running in parallel. + // Make sure the shipper is not running in parallel. tenantInstance.mtx.Lock() - defer tenantInstance.mtx.Unlock() + shipper := tenantInstance.ship + tenantInstance.ship = nil + tenantInstance.mtx.Unlock() + shipper.Close() sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds() if sinceLastAppendMillis <= compactThreshold { @@ -402,8 +434,8 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } level.Info(logger).Log("msg", "Pruning tenant") - if tenantInstance.ship != nil { - uploaded, err := tenantInstance.ship.Sync(ctx) + if shipper != nil { + uploaded, err := shipper.Sync(ctx) if err != nil { return false, err } @@ -422,7 +454,7 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } tenantInstance.readyS.set(nil) - tenantInstance.setComponents(nil, nil, nil) + tenantInstance.setComponents(nil, nil, nil, nil) return true, nil } @@ -574,6 +606,8 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant level.Info(logger).Log("msg", "opening TSDB") opts := *t.tsdbOpts + opts.BlocksToDelete = tenant.blocksToDelete + tenant.blocksToDeleteFn = tsdb.DefaultBlocksToDelete // NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL // which gets later converted into a block. Without setting this flag to false, the block would get compacted diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 89aeebab734..33af2ff640e 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -9,17 +9,21 @@ import ( "math" "os" "path" + "path/filepath" "strings" + "sync" "testing" "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -28,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -460,6 +465,8 @@ func TestMultiTSDBPrune(t *testing.T) { defer cancel() if test.bucket != nil { + _, err := m.Sync(ctx) + testutil.Ok(t, err) go func() { testutil.Ok(t, syncTSDBs(ctx, m, 10*time.Millisecond)) }() @@ -829,3 +836,59 @@ func BenchmarkMultiTSDB(b *testing.B) { _, _ = a.Append(0, l, int64(i), float64(i)) } } + +func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) { + tenant := &tenant{ + mtx: &sync.RWMutex{}, + } + + t.Run("no blocks", func(t *testing.T) { + require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil)) + }) + + tenant.tsdb = &tsdb.DB{} + + mockBlockIDs := []ulid.ULID{ + ulid.MustNew(1, nil), + ulid.MustNew(2, nil), + } + + t.Run("no shipper", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + }, tenant.blocksToDelete(nil)) + }) + + t.Run("some blocks uploaded", func(t *testing.T) { + tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc { + return func(_ []*tsdb.Block) map[ulid.ULID]struct{} { + return map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + mockBlockIDs[1]: {}, + } + } + } + + td := t.TempDir() + + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{ + Version: shipper.MetaVersion1, + Uploaded: []ulid.ULID{mockBlockIDs[0]}, + })) + + tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketUploadSource, nil, false, metadata.NoneFunc, "") + require.Equal(t, map[ulid.ULID]struct{}{ + mockBlockIDs[0]: {}, + }, tenant.blocksToDelete(nil)) + }) +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index a3b520d15ec..8b39fbc006d 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -82,6 +82,8 @@ type Shipper struct { labels func() labels.Labels mtx sync.RWMutex + closed bool + wg sync.WaitGroup } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -247,6 +249,13 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo return nil } +func (s *Shipper) Close() { + s.mtx.Lock() + s.closed = true + s.mtx.Unlock() + s.wg.Wait() +} + // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded // to the object bucket once. // @@ -254,6 +263,16 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo // // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { + s.mtx.Lock() + if s.closed { + s.mtx.Unlock() + return 0, nil + } + s.wg.Add(1) + s.mtx.Unlock() + + defer s.wg.Done() + meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { // If we encounter any error, proceed with an empty meta file and overwrite it later. @@ -355,6 +374,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, nil } +func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} { + meta, err := ReadMetaFile(s.metadataFilePath) + if err != nil { + // NOTE(GiedriusS): Sync() will inform users about any problems. + return nil + } + + ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded)) + for _, id := range meta.Uploaded { + ret[id] = struct{}{} + } + + return ret +} + // sync uploads the block if not exists in remote storage. // TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error. func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {