Skip to content

Commit

Permalink
compact: Add BlocksAPI and improve logging
Browse files Browse the repository at this point in the history
1. Add BlocksAPI to Compact method

2. Update BlocksAPI initialization
   - Return plannedBlocksInfo in BlocksAPI constructor

3. Enhance compact package
   - Import blocksAPI from "github.com/thanos-io/thanos/pkg/api/blocks"
   - Add blocksAPI parameter to Group.compact method

This change improves the integration of BlocksAPI with the compaction
process.

Signed-off-by: [amandaguan-ag] <amandaguan1314@gmail.com>
  • Loading branch information
amandaguan-ag committed Aug 23, 2024
1 parent ee15442 commit 61711a7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func runCompact(
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(ctx, api); err != nil {
return errors.Wrap(err, "compaction")
}

Expand Down
16 changes: 5 additions & 11 deletions pkg/api/blocks/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap ma
Blocks: []metadata.Meta{},
Label: label,
},
plannedBlocksInfo: &BlocksInfo{
Blocks: []metadata.Meta{},
Label: label,
},
disableCORS: disableCORS,
bkt: bkt,
disableAdminOperations: disableAdminOperations,
Expand Down Expand Up @@ -149,16 +153,7 @@ func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiEr
}

func (bapi *BlocksAPI) plannedBlocks(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
ctx := r.Context()


select {
case <-ctx.Done():
return nil, []error{ctx.Err()}, nil, func() {}
default:
}

return bapi.plannedBlocksInfo, nil, nil, func() {}
return bapi.plannedBlocksInfo, nil, nil, func() {}
}

func (b *BlocksInfo) set(blocks []metadata.Meta, err error) {
Expand Down Expand Up @@ -192,6 +187,5 @@ func (bapi *BlocksAPI) SetLoaded(blocks []metadata.Meta, err error) {

// SetPlanned updates the plan blocks' metadata in the API.
func (bapi *BlocksAPI) SetPlanned(blocks []metadata.Meta, err error) {

bapi.plannedBlocksInfo.set(blocks, err)
}
23 changes: 15 additions & 8 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -795,9 +796,11 @@ type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error)
// Delete before merge: UpdateOnPlanned allows to update the planner with a function that will be called with the planned blocks and an error.
UpdateOnPlanned(f func([]metadata.Meta, error))
}

// Delete before merge: UpdateOnPlanned allows to update the planner with a function that will be called with the planned blocks and an error.
func (p *tsdbBasedPlanner) UpdateOnPlanned(f func([]metadata.Meta, error)) {
p.updateOnPlanned = f
}
Expand Down Expand Up @@ -868,7 +871,7 @@ type Compactor interface {

// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, blocksAPI *blocksAPI.BlocksAPI) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
cg.compactionRunsStarted.Inc()

subDir := filepath.Join(dir, cg.Key())
Expand Down Expand Up @@ -905,7 +908,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp

errChan := make(chan error, 1)
err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) {
shouldRerun, compIDs, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan)
shouldRerun, compIDs, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan, blocksAPI)
return err
}, opentracing.Tags{"group.key": cg.Key()})
errChan <- err
Expand Down Expand Up @@ -1121,7 +1124,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return nil
}

func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (bool, []ulid.ULID, error) {
func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error, blocksAPI *blocksAPI.BlocksAPI) (bool, []ulid.ULID, error) {
cg.mtx.Lock()
defer cg.mtx.Unlock()

Expand All @@ -1140,9 +1143,6 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
var toCompact []*metadata.Meta
if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) {
toCompact, e = planner.Plan(ctx, cg.metasByMinTime, errChan, cg.extensions)
planner.UpdateOnPlanned(func(metas []metadata.Meta, err error) {
level.Info(cg.logger).Log("msg", "planner updated", "metas", len(metas), "err", err)
})
return e
}); err != nil {
return false, nil, errors.Wrap(err, "plan compaction")
Expand All @@ -1151,6 +1151,13 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// Nothing to do.
return false, nil, nil
}
// Delete before merge: Convert []*metadata.Meta to []metadata.Meta
plannedMetas := make([]metadata.Meta, len(toCompact))
for i, meta := range toCompact {
plannedMetas[i] = *meta
}
// Delete before merge: Call blocksAPI interface defined in the github.com/thanos-io/thanos/pkg/api/blocks package
blocksAPI.SetPlanned(plannedMetas, nil)

level.Info(cg.logger).Log("msg", "compaction available and planned", "plan", fmt.Sprintf("%v", toCompact))

Expand Down Expand Up @@ -1436,7 +1443,7 @@ func NewBucketCompactorWithCheckerAndCallback(
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
func (c *BucketCompactor) Compact(ctx context.Context, blocksAPI *blocksAPI.BlocksAPI) (rerr error) {
defer func() {
// Do not remove the compactDir if an error has occurred
// because potentially on the next run we would not have to download
Expand Down Expand Up @@ -1468,7 +1475,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
go func() {
defer wg.Done()
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback)
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback, blocksAPI)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
Expand Down

0 comments on commit 61711a7

Please sign in to comment.