diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index b74b780176..5ea7bfcc5b 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -416,7 +416,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantInstance.mtx.Lock() shipper := tenantInstance.ship tenantInstance.ship = nil - shipper.DisableWait() tenantInstance.mtx.Unlock() defer func() { @@ -426,7 +425,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst // If the tenant was not pruned, re-enable the shipper. tenantInstance.mtx.Lock() tenantInstance.ship = shipper - shipper.Enable() tenantInstance.mtx.Unlock() }() @@ -447,7 +445,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst level.Info(logger).Log("msg", "Pruning tenant") if shipper != nil { // No other code can reach this shipper anymore so enable it again to be able to sync manually. - shipper.Enable() uploaded, err := shipper.Sync(ctx) if err != nil { return false, err diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 6f3d9bed7c..6b7be6cd12 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -82,8 +82,6 @@ type Shipper struct { labels func() labels.Labels mtx sync.RWMutex - closed bool - wg sync.WaitGroup } // New creates a new shipper that detects new TSDB blocks in dir and uploads them to @@ -138,13 +136,6 @@ func (s *Shipper) SetLabels(lbls labels.Labels) { s.labels = func() labels.Labels { return lbls } } -func (s *Shipper) getLabels() labels.Labels { - s.mtx.RLock() - defer s.mtx.RUnlock() - - return s.labels() -} - // Timestamps returns the minimum timestamp for which data is available and the highest timestamp // of blocks that were successfully uploaded. func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { @@ -249,30 +240,6 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo return nil } -// DisableWait disables the shipper and waits for all ongoing syncs to finish. -// Useful when you want to sync one last time before pruning a TSDB. -func (s *Shipper) DisableWait() { - if s == nil { - return - } - s.mtx.Lock() - s.closed = true - s.mtx.Unlock() - s.wg.Wait() -} - -// Enable enables the shipper again. -// Useful when you want to sync one last time before pruning a TSDB. -// Remove all references to the shipper, call DisableWait, call Enable, and then call Sync() one last time. -func (s *Shipper) Enable() { - if s == nil { - return - } - s.mtx.Lock() - s.closed = false - s.mtx.Unlock() -} - // Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded // to the object bucket once. // @@ -281,14 +248,7 @@ func (s *Shipper) Enable() { // It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok). func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { s.mtx.Lock() - if s.closed { - s.mtx.Unlock() - return 0, nil - } - s.wg.Add(1) - s.mtx.Unlock() - - defer s.wg.Done() + defer s.mtx.Unlock() meta, err := ReadMetaFile(s.metadataFilePath) if err != nil { @@ -311,7 +271,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { meta.Uploaded = nil var ( - checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels) + checker = newLazyOverlapChecker(s.logger, s.bucket, func() labels.Labels { return s.labels() }) uploadErrs int ) @@ -433,7 +393,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return errors.Wrap(err, "hard link block") } // Attach current labels and write a new meta file with Thanos extensions. - if lset := s.getLabels(); !lset.IsEmpty() { + if lset := s.labels(); !lset.IsEmpty() { lset.Range(func(l labels.Label) { meta.Thanos.Labels[l.Name] = l.Value })