Skip to content

Commit

Permalink
skip blocks with out-of-order chunk during compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Hu <yhuz@amazon.com>
  • Loading branch information
huyan0 committed Jul 27, 2021
1 parent 83419bc commit f8b52a4
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 36 deletions.
7 changes: 4 additions & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
}, []string{"marker", "reason"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)

m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -349,13 +349,14 @@ func runCompact(
reg,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
)
planner := compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
bkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand Down
19 changes: 13 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
errMsg = append(errMsg, fmt.Sprintf(
func (i HealthStats) OutOfOrderChunksErr() error {
if i.OutOfOrderChunks > 0 {
return errors.New(fmt.Sprintf(
"%d/%d series have an average of %.3f out-of-order chunks: "+
"%.3f of these are exact duplicates (in terms of data and time range)",
i.OutOfOrderSeries,
Expand All @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
))
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
if n > 0 {
Expand Down Expand Up @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.OutOfOrderChunksErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package block
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
testutil.Equals(t, 1, len(chks))
}
}

func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
blockDir, err := ioutil.TempDir("", "test-ooo-index")
testutil.Ok(t, err)

err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
testutil.Ok(t, err)

stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)

testutil.Ok(t, err)
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
53 changes: 50 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type DefaultGrouper struct {
verticalCompactions *prometheus.CounterVec
garbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
}

Expand All @@ -252,6 +253,7 @@ func NewDefaultGrouper(
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
) *DefaultGrouper {
return &DefaultGrouper{
Expand Down Expand Up @@ -279,9 +281,10 @@ func NewDefaultGrouper(
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"group"}),
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
}
}

Expand Down Expand Up @@ -309,6 +312,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.verticalCompactions.WithLabelValues(groupKey),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
)
if err != nil {
Expand Down Expand Up @@ -346,6 +350,7 @@ type Group struct {
verticalCompactions prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
}

Expand All @@ -365,6 +370,7 @@ func NewGroup(
verticalCompactions prometheus.Counter,
groupGarbageCollectedBlocks prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
blockMakredForNoCopmact prometheus.Counter,
hashFunc metadata.HashFunc,
) (*Group, error) {
if logger == nil {
Expand All @@ -385,6 +391,7 @@ func NewGroup(
verticalCompactions: verticalCompactions,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blockMakredForNoCopmact,
hashFunc: hashFunc,
}
return g, nil
Expand Down Expand Up @@ -541,6 +548,27 @@ func IsIssue347Error(err error) bool {
return ok
}

// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
type OutOfOrderChunksError struct {
err error

id ulid.ULID
}

func (e OutOfOrderChunksError) Error() string {
return e.err.Error()
}

func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
func IsOutOfOrderChunkError(err error) bool {
_, ok := errors.Cause(err).(OutOfOrderChunksError)
return ok
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
Expand Down Expand Up @@ -749,6 +777,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks should be dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
Expand Down Expand Up @@ -939,6 +971,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
continue
}
}
// if block has out of order chunk, mark the block for no copmaction and continue
if IsOutOfOrderChunkError(err) {
if err := block.MarkForNoCompact(
ctx,
c.logger,
c.bkt,
err.(OutOfOrderChunksError).id,
metadata.OutOfOrderChunksNoCompactReason,
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
Expand Down
Loading

0 comments on commit f8b52a4

Please sign in to comment.