Skip to content

Compactor: Un-export symbols that don't need to be exported #7317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 29 additions & 34 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ import (
var errCompactionIterationCancelled = cancellation.NewErrorf("compaction iteration cancelled")
var errCompactionIterationStopped = cancellation.NewErrorf("compaction iteration stopped")

type DeduplicateFilter interface {
type deduplicateFilter interface {
block.MetadataFilter

// DuplicateIDs returns IDs of duplicate blocks generated by the last call to the Filter method.
DuplicateIDs() []ulid.ULID
}

// Syncer synchronizes block metas from a bucket into a local directory.
// metaSyncer synchronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
type metaSyncer struct {
logger log.Logger
bkt objstore.Bucket
fetcher *block.MetaFetcher
mtx sync.Mutex
blocks map[ulid.ULID]*block.Meta
metrics *syncerMetrics
deduplicateBlocksFilter DeduplicateFilter
deduplicateBlocksFilter deduplicateFilter
}

type syncerMetrics struct {
Expand Down Expand Up @@ -85,13 +85,13 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion prometh
return &m
}

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// newMetaSyncer returns a new metaSyncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay to be considered.
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter DeduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*Syncer, error) {
func newMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher *block.MetaFetcher, deduplicateBlocksFilter deduplicateFilter, blocksMarkedForDeletion prometheus.Counter) (*metaSyncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
return &metaSyncer{
logger: logger,
bkt: bkt,
fetcher: fetcher,
Expand All @@ -102,7 +102,7 @@ func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bu
}

// SyncMetas synchronizes the local state of block metas with what we have in the bucket.
func (s *Syncer) SyncMetas(ctx context.Context) error {
func (s *metaSyncer) SyncMetas(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -117,7 +117,7 @@ func (s *Syncer) SyncMetas(ctx context.Context) error {
}

// Metas returns loaded metadata blocks since last sync.
func (s *Syncer) Metas() map[ulid.ULID]*block.Meta {
func (s *metaSyncer) Metas() map[ulid.ULID]*block.Meta {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -127,7 +127,7 @@ func (s *Syncer) Metas() map[ulid.ULID]*block.Meta {
// GarbageCollect marks blocks for deletion from bucket if their data is available as part of a
// block with a higher compaction level.
// A call to SyncMetas function is required to populate duplicateIDs in duplicateBlocksFilter.
func (s *Syncer) GarbageCollect(ctx context.Context) error {
func (s *metaSyncer) GarbageCollect(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand Down Expand Up @@ -171,12 +171,8 @@ type Grouper interface {
Groups(blocks map[ulid.ULID]*block.Meta) (res []*Job, err error)
}

// DefaultGroupKey returns a unique identifier for the group the block belongs to, based on
// defaultGroupKey returns a unique identifier for the group the block belongs to, based on
// the DefaultGrouper logic. It considers the downsampling resolution and the block's labels.
func DefaultGroupKey(meta block.ThanosMeta) string {
return defaultGroupKey(meta.Downsample.Resolution, labels.FromMap(meta.Labels))
}

func defaultGroupKey(res int64, lbls labels.Labels) string {
return fmt.Sprintf("%d@%v", res, labels.StableHash(lbls))
}
Expand Down Expand Up @@ -313,7 +309,10 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but repairable block %s", bdir), meta.ULID)
return issue347Error{
err: errors.Wrapf(err, "invalid, but repairable block %s", bdir),
id: meta.ULID,
}
}

if err := stats.OutOfOrderLabelsErr(); err != nil {
Expand Down Expand Up @@ -503,25 +502,21 @@ type ulidWithShardIndex struct {
shardIndex int
}

// Issue347Error is a type wrapper for errors that should invoke the repair process for broken block.
type Issue347Error struct {
// issue347Error is a type wrapper for errors that should invoke the repair process for broken block.
type issue347Error struct {
err error
id ulid.ULID
}

func issue347Error(err error, brokenBlock ulid.ULID) Issue347Error {
return Issue347Error{err: err, id: brokenBlock}
}

func (e Issue347Error) Error() string {
func (e issue347Error) Error() string {
return fmt.Sprintf("%s (block: %s)", e.err.Error(), e.id.String())
}

// IsIssue347Error returns true if the base error is an Issue347Error.
func IsIssue347Error(err error) (bool, Issue347Error) {
var issue347Err Issue347Error
ok := errors.As(err, &issue347Err)
return ok, issue347Err
// isIssue347Error returns true if the base error is an issue347Error.
func isIssue347Error(err error) (bool, issue347Error) {
var ie issue347Error
ok := errors.As(err, &ie)
return ok, ie
}

// OutOfOrderChunksError is a type wrapper for OOO chunk error from validating block index.
Expand Down Expand Up @@ -566,8 +561,8 @@ func IsCriticalError(err error) (bool, CriticalError) {
return ok, criticalErr
}

// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, ie Issue347Error) error {
// repairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error.
func repairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, blocksMarkedForDeletion prometheus.Counter, ie issue347Error) error {
level.Info(logger).Log("msg", "Repairing block broken by https://github.com/prometheus/tsdb/issues/347", "id", ie.id, "err", ie)

tmpdir, err := os.MkdirTemp("", fmt.Sprintf("repair-issue-347-id-%s-", ie.id))
Expand Down Expand Up @@ -696,7 +691,7 @@ var ownAllJobs = func(job *Job) (bool, error) {
// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
sy *Syncer
sy *metaSyncer
grouper Grouper
comp Compactor
planner Planner
Expand All @@ -714,7 +709,7 @@ type BucketCompactor struct {
// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(
logger log.Logger,
sy *Syncer,
sy *metaSyncer,
grouper Grouper,
planner Planner,
comp Compactor,
Expand Down Expand Up @@ -820,8 +815,8 @@ func (c *BucketCompactor) Compact(ctx context.Context, maxCompactionTime time.Du
// At this point the compaction has failed.
c.metrics.groupCompactionRunsFailed.Inc()

if ok, issue347Err := IsIssue347Error(err); ok {
if err := RepairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, issue347Err); err == nil {
if ok, issue347Err := isIssue347Error(err); ok {
if err := repairIssue347(workCtx, c.logger, c.bkt, c.sy.metrics.blocksMarkedForDeletion, issue347Err); err == nil {
mtx.Lock()
finishedAllJobs = false
mtx.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
require.NoError(t, err)

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion)
sy, err := newMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion)
require.NoError(t, err)

// Do one initial synchronization with the bucket.
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestGroupCompactE2E(t *testing.T) {
require.NoError(t, err)

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion)
sy, err := newMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion)
require.NoError(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestGroupCompactE2E(t *testing.T) {
return err
}

others[DefaultGroupKey(meta.Thanos)] = meta
others[defaultGroupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels))] = meta
return nil
}))

Expand Down Expand Up @@ -500,7 +500,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
})
require.NoError(t, err)

sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion)
sy, err := newMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, blocksMarkedForDeletion)
require.NoError(t, err)

// Do one initial synchronization with the bucket.
Expand Down
16 changes: 8 additions & 8 deletions pkg/compactor/bucket_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/grafana/mimir/pkg/util/extprom"
)

func TestGroupKey(t *testing.T) {
func TestDefaultGroupKey(t *testing.T) {
for _, tcase := range []struct {
input block.ThanosMeta
expected string
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestGroupKey(t *testing.T) {
},
} {
if ok := t.Run("", func(t *testing.T) {
assert.Equal(t, tcase.expected, DefaultGroupKey(tcase.input))
assert.Equal(t, tcase.expected, defaultGroupKey(tcase.input.Downsample.Resolution, labels.FromMap(tcase.input.Labels)))
}); !ok {
return
}
Expand All @@ -82,10 +82,10 @@ func TestGroupMaxMinTime(t *testing.T) {
func TestBucketCompactor_FilterOwnJobs(t *testing.T) {
jobsFn := func() []*Job {
return []*Job{
NewJob("user", "key1", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key2", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key3", labels.EmptyLabels(), 0, false, 0, ""),
NewJob("user", "key4", labels.EmptyLabels(), 0, false, 0, ""),
newJob("user", "key1", labels.EmptyLabels(), 0, false, 0, ""),
newJob("user", "key2", labels.EmptyLabels(), 0, false, 0, ""),
newJob("user", "key3", labels.EmptyLabels(), 0, false, 0, ""),
newJob("user", "key4", labels.EmptyLabels(), 0, false, 0, ""),
}
}

Expand Down Expand Up @@ -132,15 +132,15 @@ func TestBucketCompactor_FilterOwnJobs(t *testing.T) {
}

func TestBlockMaxTimeDeltas(t *testing.T) {
j1 := NewJob("user", "key1", labels.EmptyLabels(), 0, false, 0, "")
j1 := newJob("user", "key1", labels.EmptyLabels(), 0, false, 0, "")
require.NoError(t, j1.AppendMeta(&block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1500002700159,
MaxTime: 1500002800159,
},
}))

j2 := NewJob("user", "key2", labels.EmptyLabels(), 0, false, 0, "")
j2 := newJob("user", "key2", labels.EmptyLabels(), 0, false, 0, "")
require.NoError(t, j2.AppendMeta(&block.Meta{
BlockMeta: tsdb.BlockMeta{
MinTime: 1500002600159,
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (c *MultitenantCompactor) compactUser(ctx context.Context, userID string) e
return err
}

syncer, err := NewMetaSyncer(
syncer, err := newMetaSyncer(
userLogger,
reg,
userBucket,
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Job struct {
splitNumShards uint32
}

// NewJob returns a new compaction Job.
func NewJob(userID string, key string, lset labels.Labels, resolution int64, useSplitting bool, splitNumShards uint32, shardingKey string) *Job {
// newJob returns a new compaction Job.
func newJob(userID string, key string, lset labels.Labels, resolution int64, useSplitting bool, splitNumShards uint32, shardingKey string) *Job {
return &Job{
userID: userID,
key: key,
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func TestJob_MinCompactionLevel(t *testing.T) {
job := NewJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, "shard-1")
job := newJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, "shard-1")
require.NoError(t, job.AppendMeta(&block.Meta{BlockMeta: tsdb.BlockMeta{ULID: ulid.MustNew(1, nil), Compaction: tsdb.BlockMetaCompaction{Level: 2}}}))
assert.Equal(t, 2, job.MinCompactionLevel())

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestJobWaitPeriodElapsed(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
job := NewJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, "shard-1")
job := newJob("user-1", "group-1", labels.EmptyLabels(), 0, true, 2, "shard-1")
for _, b := range testData.jobBlocks {
require.NoError(t, job.AppendMeta(b.meta))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/split_merge_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*block.Meta) (res []*
resolution := job.blocks[0].Thanos.Downsample.Resolution
externalLabels := labels.FromMap(job.blocks[0].Thanos.Labels)

compactionJob := NewJob(
compactionJob := newJob(
g.userID,
groupKey,
externalLabels,
Expand Down