Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed May 16, 2021
1 parent d9eea7b commit e4c4f82
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
testutil.Ok(t, err)

planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000})
Expand Down
42 changes: 22 additions & 20 deletions pkg/compact/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
)

// NewDedupChunkSeriesMerger merges several chunk series into one.
// Deduplication is based on penalty based deduplication algorithm without handling counter reset.
func NewDedupChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc {
return func(series ...storage.ChunkSeries) storage.ChunkSeries {
if len(series) == 0 {
return nil
}
if len(series) == 0 {
return series[0]
}
return &storage.ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
Expand All @@ -36,9 +41,6 @@ type dedupChunksIterator struct {
}

func newDedupChunksIterator(replicas ...storage.ChunkSeries) chunks.Iterator {
if len(replicas) == 1 {
return replicas[0].Iterator()
}
iterators := make([]chunks.Iterator, 0, len(replicas))
for _, s := range replicas {
iterators = append(iterators, s.Iterator())
Expand Down Expand Up @@ -78,33 +80,33 @@ func (d *dedupChunksIterator) At() chunks.Meta {
return d.curr
}

func (c *dedupChunksIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
func (d *dedupChunksIterator) Next() bool {
if d.h == nil {
for _, iter := range d.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
heap.Push(&d.h, iter)
}
}
}
if len(c.h) == 0 {
if len(d.h) == 0 {
return false
}

iter := heap.Pop(&c.h).(chunks.Iterator)
c.curr = iter.At()
iter := heap.Pop(&d.h).(chunks.Iterator)
d.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
heap.Push(&d.h, iter)
}

var (
overlapping []storage.Series
oMaxTime = c.curr.MaxTime
oMaxTime = d.curr.MaxTime
)

// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len(c.h) > 0 {
for len(d.h) > 0 {
// Get the next oldest chunk by min, then max time.
next := c.h[0].At()
next := d.h[0].At()
if next.MinTime > oMaxTime {
// No overlap with current one.
break
Expand All @@ -116,9 +118,9 @@ func (c *dedupChunksIterator) Next() bool {
oMaxTime = next.MaxTime
}

iter := heap.Pop(&c.h).(chunks.Iterator)
iter := heap.Pop(&d.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
heap.Push(&d.h, iter)
}
}
if len(overlapping) == 0 {
Expand All @@ -129,7 +131,7 @@ func (c *dedupChunksIterator) Next() bool {
iter = (&seriesToChunkEncoder{Series: &storage.SeriesEntry{
Lset: nil,
SampleIteratorFn: func() chunkenc.Iterator {
it := newChunkToSeriesDecoder(nil, c.curr).Iterator()
it := newChunkToSeriesDecoder(nil, d.curr).Iterator()

for _, o := range overlapping {
it = newDedupSamplesIterator(it, o.Iterator())
Expand All @@ -138,14 +140,14 @@ func (c *dedupChunksIterator) Next() bool {
},
}}).Iterator()
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
if d.err = iter.Err(); d.err != nil {
return false
}
panic("unexpected seriesToChunkEncoder lack of iterations")
}
c.curr = iter.At()
d.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
heap.Push(&d.h, iter)
}
return true
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) {
}

// This mimics our default ExponentialBlockRanges with min block size equals to 20.
tsdbComp, err := tsdb.NewLeveledCompactor(context.Background(), nil, nil, ranges, nil)
tsdbComp, err := tsdb.NewLeveledCompactor(context.Background(), nil, nil, ranges, nil, nil)
testutil.Ok(t, err)
tsdbPlanner := &tsdbPlannerAdapter{comp: tsdbComp}
tsdbBasedPlanner := NewTSDBBasedPlanner(log.NewNopLogger(), ranges)
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
}

// This mimics our default ExponentialBlockRanges with min block size equals to 20.
tsdbComp, err := tsdb.NewLeveledCompactor(context.Background(), nil, nil, ranges, nil)
tsdbComp, err := tsdb.NewLeveledCompactor(context.Background(), nil, nil, ranges, nil, nil)
testutil.Ok(t, err)
tsdbPlanner := &tsdbPlannerAdapter{comp: tsdbComp}
tsdbBasedPlanner := NewTSDBBasedPlanner(log.NewNopLogger(), ranges)
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func appendTestData(t testing.TB, app storage.Appender, series int) {
}

func createBlockFromHead(t testing.TB, dir string, head *tsdb.Head) ulid.ULID {
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, os.MkdirAll(dir, 0777))
Expand Down Expand Up @@ -1907,7 +1907,7 @@ func createBlockWithLargeChunk(t testutil.TB, dir string, lbls labels.Labels, ra
b2 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 11, random, 1000)

// Merge the blocks together.
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
testutil.Ok(t, err)

blocksToCompact := []string{filepath.Join(dir, b1.String()), filepath.Join(dir, b2.String())}
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func createBlock(
if err := g.Wait(); err != nil {
return id, err
}
c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil)
c, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{maxt - mint}, nil, nil)
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
Expand Down

0 comments on commit e4c4f82

Please sign in to comment.