Skip to content

Commit

Permalink
shipper/receive: just use a single lock
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Apr 2, 2024
1 parent 36f5ff1 commit 48f502c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 46 deletions.
3 changes: 0 additions & 3 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
tenantInstance.mtx.Lock()
shipper := tenantInstance.ship
tenantInstance.ship = nil
shipper.DisableWait()
tenantInstance.mtx.Unlock()

defer func() {
Expand All @@ -426,7 +425,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
// If the tenant was not pruned, re-enable the shipper.
tenantInstance.mtx.Lock()
tenantInstance.ship = shipper
shipper.Enable()
tenantInstance.mtx.Unlock()
}()

Expand All @@ -447,7 +445,6 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
level.Info(logger).Log("msg", "Pruning tenant")
if shipper != nil {
// No other code can reach this shipper anymore so enable it again to be able to sync manually.
shipper.Enable()
uploaded, err := shipper.Sync(ctx)
if err != nil {
return false, err
Expand Down
46 changes: 3 additions & 43 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ type Shipper struct {

labels func() labels.Labels
mtx sync.RWMutex
closed bool
wg sync.WaitGroup
}

// New creates a new shipper that detects new TSDB blocks in dir and uploads them to
Expand Down Expand Up @@ -138,13 +136,6 @@ func (s *Shipper) SetLabels(lbls labels.Labels) {
s.labels = func() labels.Labels { return lbls }
}

func (s *Shipper) getLabels() labels.Labels {
s.mtx.RLock()
defer s.mtx.RUnlock()

return s.labels()
}

// Timestamps returns the minimum timestamp for which data is available and the highest timestamp
// of blocks that were successfully uploaded.
func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
Expand Down Expand Up @@ -249,30 +240,6 @@ func (c *lazyOverlapChecker) IsOverlapping(ctx context.Context, newMeta tsdb.Blo
return nil
}

// DisableWait disables the shipper and waits for all ongoing syncs to finish.
// Useful when you want to sync one last time before pruning a TSDB.
func (s *Shipper) DisableWait() {
if s == nil {
return
}
s.mtx.Lock()
s.closed = true
s.mtx.Unlock()
s.wg.Wait()
}

// Enable enables the shipper again.
// Useful when you want to sync one last time before pruning a TSDB.
// Remove all references to the shipper, call DisableWait, call Enable, and then call Sync() one last time.
func (s *Shipper) Enable() {
if s == nil {
return
}
s.mtx.Lock()
s.closed = false
s.mtx.Unlock()
}

// Sync performs a single synchronization, which ensures all non-compacted local blocks have been uploaded
// to the object bucket once.
//
Expand All @@ -281,14 +248,7 @@ func (s *Shipper) Enable() {
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
s.mtx.Lock()
if s.closed {
s.mtx.Unlock()
return 0, nil
}
s.wg.Add(1)
s.mtx.Unlock()

defer s.wg.Done()
defer s.mtx.Unlock()

meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
Expand All @@ -311,7 +271,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
meta.Uploaded = nil

var (
checker = newLazyOverlapChecker(s.logger, s.bucket, s.getLabels)
checker = newLazyOverlapChecker(s.logger, s.bucket, func() labels.Labels { return s.labels() })
uploadErrs int
)

Expand Down Expand Up @@ -433,7 +393,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
return errors.Wrap(err, "hard link block")
}
// Attach current labels and write a new meta file with Thanos extensions.
if lset := s.getLabels(); !lset.IsEmpty() {
if lset := s.labels(); !lset.IsEmpty() {
lset.Range(func(l labels.Label) {
meta.Thanos.Labels[l.Name] = l.Value
})
Expand Down

0 comments on commit 48f502c

Please sign in to comment.