-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compact: Display Planned and Running Compactions #7590
base: main
Are you sure you want to change the base?
Changes from all commits
6ceb3b7
112bc0d
1dca384
e7c15bf
dbe5d56
d43a456
c6cf932
71b94c6
72942d0
2a486c9
f9777dc
3c7b555
f343a47
a039136
08592ae
ea98f7c
394d56e
7c46fc2
28ec713
97e0886
06cfa3a
87134d3
ee15442
ac2253a
2f19b48
2c7d008
82a07e7
b4ffc90
1046dfa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
github.com/Songmu/gotesplit v0.2.1 h1:qJFvR75nJpeKyMQFwyDtFrcc6zDWhrHAkks7DvM8oLo= | ||
github.com/Songmu/gotesplit v0.2.1/go.mod h1:sVBfmLT26b1H5VhUpq8cRhCVK75GAmW9c8r2NiK0gzk= | ||
github.com/jstemmer/go-junit-report v1.0.0 h1:8X1gzZpR+nVQLAht+L/foqOeX2l9DTZoaIPbEQHxsds= | ||
github.com/jstemmer/go-junit-report v1.0.0/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= | ||
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= | ||
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,15 +26,16 @@ import ( | |
|
||
// BlocksAPI is a very simple API used by Thanos Block Viewer. | ||
type BlocksAPI struct { | ||
baseAPI *api.BaseAPI | ||
logger log.Logger | ||
globalBlocksInfo *BlocksInfo | ||
loadedBlocksInfo *BlocksInfo | ||
baseAPI *api.BaseAPI | ||
logger log.Logger | ||
globalBlocksInfo *BlocksInfo | ||
loadedBlocksInfo *BlocksInfo | ||
plannedBlocksInfo *BlocksInfo | ||
|
||
globalLock, loadedLock sync.Mutex | ||
disableCORS bool | ||
bkt objstore.Bucket | ||
disableAdminOperations bool | ||
disableCORS bool | ||
bkt objstore.Bucket | ||
disableAdminOperations bool | ||
} | ||
|
||
type BlocksInfo struct { | ||
|
@@ -77,6 +78,10 @@ func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap ma | |
Blocks: []metadata.Meta{}, | ||
Label: label, | ||
}, | ||
plannedBlocksInfo: &BlocksInfo{ | ||
Blocks: []metadata.Meta{}, | ||
Label: label, | ||
}, | ||
disableCORS: disableCORS, | ||
bkt: bkt, | ||
disableAdminOperations: disableAdminOperations, | ||
|
@@ -90,6 +95,7 @@ func (bapi *BlocksAPI) Register(r *route.Router, tracer opentracing.Tracer, logg | |
|
||
r.Get("/blocks", instr("blocks", bapi.blocks)) | ||
r.Post("/blocks/mark", instr("blocks_mark", bapi.markBlock)) | ||
r.Get("/blocks/plan", instr("blocks_plan", bapi.plannedBlocks)) | ||
} | ||
|
||
func (bapi *BlocksAPI) markBlock(r *http.Request) (interface{}, []error, *api.ApiError, func()) { | ||
|
@@ -146,6 +152,10 @@ func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiEr | |
return bapi.globalBlocksInfo, nil, nil, func() {} | ||
} | ||
|
||
func (bapi *BlocksAPI) plannedBlocks(r *http.Request) (interface{}, []error, *api.ApiError, func()) { | ||
return bapi.plannedBlocksInfo, nil, nil, func() {} | ||
} | ||
|
||
func (b *BlocksInfo) set(blocks []metadata.Meta, err error) { | ||
if err != nil { | ||
// Last view is maintained. | ||
|
@@ -174,3 +184,8 @@ func (bapi *BlocksAPI) SetLoaded(blocks []metadata.Meta, err error) { | |
|
||
bapi.loadedBlocksInfo.set(blocks, err) | ||
} | ||
|
||
// SetPlanned updates the plan blocks' metadata in the API. | ||
func (bapi *BlocksAPI) SetPlanned(blocks []metadata.Meta, err error) { | ||
bapi.plannedBlocksInfo.set(blocks, err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be a append operation |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,15 @@ type Syncer struct { | |
g singleflight.Group | ||
} | ||
|
||
func (s *Syncer) SetPlanned(metas []metadata.Meta) error { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
for _, meta := range metas { | ||
s.blocks[meta.ULID] = &meta | ||
} | ||
return nil | ||
} | ||
|
||
// SyncerMetrics holds metrics tracked by the syncer. This struct and its fields are exported | ||
// to allow depending projects (eg. Cortex) to implement their own custom syncer while tracking | ||
// compatible metrics. | ||
|
@@ -411,6 +420,12 @@ type Group struct { | |
extensions any | ||
} | ||
|
||
func (g *Group) Metas() []*metadata.Meta { | ||
g.mtx.Lock() | ||
defer g.mtx.Unlock() | ||
return g.metasByMinTime | ||
} | ||
|
||
// NewGroup returns a new compaction group. | ||
func NewGroup( | ||
logger log.Logger, | ||
|
@@ -795,6 +810,13 @@ type Planner interface { | |
// Plan returns a list of blocks that should be compacted into single one. | ||
// The blocks can be overlapping. The provided metadata has to be ordered by minTime. | ||
Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) | ||
// Delete before merge: UpdateOnPlanned allows to update the planner with a function that will be called with the planned blocks and an error. | ||
UpdateOnPlanned(f func([]metadata.Meta, error)) | ||
} | ||
|
||
// Delete before merge: UpdateOnPlanned allows to update the planner with a function that will be called with the planned blocks and an error. | ||
func (p *tsdbBasedPlanner) UpdateOnPlanned(f func([]metadata.Meta, error)) { | ||
p.updateOnPlanned = f | ||
} | ||
|
||
type BlockDeletableChecker interface { | ||
|
@@ -1364,6 +1386,8 @@ type BucketCompactor struct { | |
bkt objstore.Bucket | ||
concurrency int | ||
skipBlocksWithOutOfOrderChunks bool | ||
blocksAPI BlocksAPI | ||
extensions any | ||
} | ||
|
||
// NewBucketCompactor creates a new bucket compactor. | ||
|
@@ -1377,6 +1401,8 @@ func NewBucketCompactor( | |
bkt objstore.Bucket, | ||
concurrency int, | ||
skipBlocksWithOutOfOrderChunks bool, | ||
blocksAPI BlocksAPI, | ||
extensions any, | ||
) (*BucketCompactor, error) { | ||
if concurrency <= 0 { | ||
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) | ||
|
@@ -1393,9 +1419,15 @@ func NewBucketCompactor( | |
bkt, | ||
concurrency, | ||
skipBlocksWithOutOfOrderChunks, | ||
blocksAPI, | ||
extensions, | ||
) | ||
} | ||
|
||
type BlocksAPI interface { | ||
SetPlanned([]metadata.Meta, error) | ||
} | ||
|
||
func NewBucketCompactorWithCheckerAndCallback( | ||
logger log.Logger, | ||
sy *Syncer, | ||
|
@@ -1408,6 +1440,8 @@ func NewBucketCompactorWithCheckerAndCallback( | |
bkt objstore.Bucket, | ||
concurrency int, | ||
skipBlocksWithOutOfOrderChunks bool, | ||
blocksAPI BlocksAPI, | ||
extensions any, | ||
) (*BucketCompactor, error) { | ||
if concurrency <= 0 { | ||
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) | ||
|
@@ -1424,6 +1458,8 @@ func NewBucketCompactorWithCheckerAndCallback( | |
bkt: bkt, | ||
concurrency: concurrency, | ||
skipBlocksWithOutOfOrderChunks: skipBlocksWithOutOfOrderChunks, | ||
blocksAPI: blocksAPI, | ||
extensions: extensions, | ||
}, nil | ||
} | ||
|
||
|
@@ -1455,12 +1491,46 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { | |
|
||
// Set up workers who will compact the groups when the groups are ready. | ||
// They will compact available groups until they encounter an error, after which they will stop. | ||
var allPlannedMetas []metadata.Meta | ||
var plannedMetasMutex sync.Mutex | ||
|
||
for i := 0; i < c.concurrency; i++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for g := range groupChan { | ||
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback) | ||
groupMetas := g.Metas() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do i need change the source from g to planner.plan, move out not for group There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. read go routine, and channels ->concurrency |
||
plannedMetas := make([]metadata.Meta, len(groupMetas)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I think we are just setting blocks of the group, instead of the planned blocks? We need planner.Plan output |
||
for i, meta := range groupMetas { | ||
plannedMetas[i] = *meta | ||
} | ||
|
||
plannedMetasMutex.Lock() | ||
allPlannedMetas = append(allPlannedMetas, plannedMetas...) | ||
plannedMetasMutex.Unlock() | ||
|
||
// Delete before merge: Call Syncer.SetPlanned before compaction | ||
if err := c.sy.SetPlanned(plannedMetas); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will be overriden on each group |
||
errChan <- errors.Wrapf(err, "set planned for group %s", g.Key()) | ||
return | ||
} | ||
|
||
// Delete before merge: Call BlocksAPI.SetPlanned to update the API | ||
c.blocksAPI.SetPlanned(plannedMetas, nil) | ||
|
||
shouldRerunGroup, compIDs, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback) | ||
if err == nil { | ||
plannedMetas := make([]metadata.Meta, 0, len(compIDs)) | ||
for _, id := range compIDs { | ||
if meta, ok := c.sy.Metas()[id]; ok { | ||
plannedMetas = append(plannedMetas, *meta) | ||
} | ||
} | ||
if err := c.sy.SetPlanned(plannedMetas); err != nil { | ||
level.Warn(c.logger).Log("msg", "failed to set planned metas after compaction", "err", err) | ||
} | ||
c.blocksAPI.SetPlanned(plannedMetas, nil) | ||
} | ||
if err == nil { | ||
if shouldRerunGroup { | ||
mtx.Lock() | ||
|
@@ -1531,7 +1601,33 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { | |
|
||
level.Info(c.logger).Log("msg", "start of compactions") | ||
|
||
// Send all groups found during this pass to the compaction workers. | ||
// Delete before merge: Convert map to slice | ||
metas := make([]*metadata.Meta, 0, len(c.sy.Metas())) | ||
for _, meta := range c.sy.Metas() { | ||
metas = append(metas, meta) | ||
} | ||
|
||
// Delete before merge: Plan compaction outside of the group loop | ||
plan, err := c.planner.Plan(ctx, metas, errChan, c.extensions) | ||
if err != nil { | ||
return errors.Wrap(err, "plan compaction") | ||
} | ||
|
||
// Delete before merge: Convert []*metadata.Meta to []metadata.Meta | ||
plannedMetas := make([]metadata.Meta, len(plan)) | ||
for i, meta := range plan { | ||
plannedMetas[i] = *meta | ||
} | ||
|
||
// Delete before merge: Call Syncer.SetPlanned before compaction | ||
if err := c.sy.SetPlanned(plannedMetas); err != nil { | ||
return errors.Wrap(err, "set planned metas") | ||
} | ||
|
||
// Delete before merge: Call BlocksAPI.SetPlanned to update the API | ||
c.blocksAPI.SetPlanned(plannedMetas, nil) | ||
|
||
// Now proceed with the group loop | ||
var groupErrs errutil.MultiError | ||
groupLoop: | ||
for _, g := range groups { | ||
|
@@ -1549,6 +1645,11 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { | |
close(groupChan) | ||
wg.Wait() | ||
|
||
if err := c.sy.SetPlanned(allPlannedMetas); err != nil { | ||
return errors.Wrap(err, "set planned metas") | ||
} | ||
c.blocksAPI.SetPlanned(allPlannedMetas, nil) | ||
|
||
// Collect any other error reported by the workers, or any error reported | ||
// while we were waiting for the last batch of groups to run the compaction. | ||
close(errChan) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ type tsdbBasedPlanner struct { | |
ranges []int64 | ||
|
||
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark | ||
updateOnPlanned func([]metadata.Meta, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
} | ||
|
||
var _ Planner = &tsdbBasedPlanner{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pass in blocksAPI.SetPlanned into UpdateOnPlanned callback?