Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact: Display Planned and Running Compactions #7590

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6ceb3b7
feat(test): Add sleep delay and browser opening for e2einteractive data
amandaguan-ag Aug 2, 2024
112bc0d
feat(blocksAPI): Add plannedBlocksInfo, plannedLock, and route for fe…
amandaguan-ag Aug 2, 2024
1dca384
feat(ui): Add placeholders for PlanBlocksContent, PlanViewWithStatusI…
amandaguan-ag Aug 2, 2024
e7c15bf
chore(build): Update auto-generated React build files
amandaguan-ag Aug 2, 2024
dbe5d56
Add mock data and simply list out planned blocks
amandaguan-ag Aug 8, 2024
d43a456
TODO: fix bapi.planner undefined (type *BlocksAPI has no field or met…
amandaguan-ag Aug 8, 2024
c6cf932
TODO: fix Type 'Block[][]' is not assignable to type 'BlocksPool'.
amandaguan-ag Aug 8, 2024
71b94c6
planner is passed as an argument to the NewBlocksAPI function and the…
amandaguan-ag Aug 8, 2024
72942d0
simplify the Blocks component by removing the redundant checks, The w…
amandaguan-ag Aug 8, 2024
2a486c9
add useState to manage the selected block.
amandaguan-ag Aug 8, 2024
f9777dc
remove SetPlanned placeholder
amandaguan-ag Aug 9, 2024
3c7b555
feat(api): Add mock data for planned blocks
amandaguan-ag Aug 12, 2024
f343a47
remove planner
amandaguan-ag Aug 13, 2024
a039136
Add Global and Planned Blocks pages
amandaguan-ag Aug 13, 2024
08592ae
Update auto-generated React build files
amandaguan-ag Aug 13, 2024
ea98f7c
fix prettier error
amandaguan-ag Aug 13, 2024
394d56e
Move GlobalBlocks and PlannedBlocks components to the blocks directory
amandaguan-ag Aug 13, 2024
7c46fc2
Update auto-generated React build files
amandaguan-ag Aug 13, 2024
28ec713
fix comment indent
amandaguan-ag Aug 13, 2024
97e0886
feat: Add UpdateOnPlanned callback support to tsdbBasedPlanner
amandaguan-ag Aug 15, 2024
06cfa3a
define the UpdateOnPlanned method on the Planner interface, rather th…
amandaguan-ag Aug 20, 2024
87134d3
call UpdateOnPlanned after each Plan call.
amandaguan-ag Aug 20, 2024
ee15442
Replace mock data with plannedBlocksInfo
amandaguan-ag Aug 21, 2024
ac2253a
compact: Add BlocksAPI and improve logging
amandaguan-ag Aug 23, 2024
2f19b48
Enhance BucketCompactor with tracking and synchronization features
amandaguan-ag Aug 26, 2024
2c7d008
update compact_e2e_test with api field in NewBucketCompactor
amandaguan-ag Aug 27, 2024
82a07e7
replace g.Metas() with planner.Plan
amandaguan-ag Aug 29, 2024
b4ffc90
accumulate all the planned metas from all groups before calling SetPl…
amandaguan-ag Aug 29, 2024
1046dfa
add mergeFunc to NewBucketCompactor
amandaguan-ag Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .bin/gopls
Binary file not shown.
Binary file added .bin/gotesplit-v0.2.1
Binary file not shown.
Binary file added .bin/staticcheck
Binary file not shown.
2 changes: 2 additions & 0 deletions .bingo/gotesplit.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/Songmu/gotesplit v0.2.1 h1:qJFvR75nJpeKyMQFwyDtFrcc6zDWhrHAkks7DvM8oLo=
github.com/Songmu/gotesplit v0.2.1/go.mod h1:sVBfmLT26b1H5VhUpq8cRhCVK75GAmW9c8r2NiK0gzk=
github.com/jstemmer/go-junit-report v1.0.0 h1:8X1gzZpR+nVQLAht+L/foqOeX2l9DTZoaIPbEQHxsds=
github.com/jstemmer/go-junit-report v1.0.0/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
16 changes: 10 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

levels, err := compactions.levels(conf.maxCompactionLevel)
if err != nil {
return errors.Wrap(err, "get compaction levels")
}

tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
Expand Down Expand Up @@ -288,6 +295,7 @@ func runCompact(
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
api.SetLoaded(blocks, err)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass in blocksAPI.SetPlanned into UpdateOnPlanned callback?

sy, err = compact.NewMetaSyncer(
logger,
reg,
Expand All @@ -303,11 +311,6 @@ func runCompact(
}
}

levels, err := compactions.levels(conf.maxCompactionLevel)
if err != nil {
return errors.Wrap(err, "get compaction levels")
}

if conf.maxCompactionLevel < compactions.maxLevel() {
level.Warn(logger).Log("msg", "Max compaction level is lower than should be", "current", conf.maxCompactionLevel, "default", compactions.maxLevel())
}
Expand Down Expand Up @@ -371,7 +374,6 @@ func runCompact(
)
var planner compact.Planner

tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
largeIndexFilterPlanner := compact.WithLargeTotalIndexSizeFilter(
tsdbPlanner,
insBkt,
Expand All @@ -394,6 +396,8 @@ func runCompact(
insBkt,
conf.compactionConcurrency,
conf.skipBlockWithOutOfOrderChunks,
api,
nil,
)
if err != nil {
return errors.Wrap(err, "create bucket compactor")
Expand Down
29 changes: 22 additions & 7 deletions pkg/api/blocks/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import (

// BlocksAPI is a very simple API used by Thanos Block Viewer.
type BlocksAPI struct {
baseAPI *api.BaseAPI
logger log.Logger
globalBlocksInfo *BlocksInfo
loadedBlocksInfo *BlocksInfo
baseAPI *api.BaseAPI
logger log.Logger
globalBlocksInfo *BlocksInfo
loadedBlocksInfo *BlocksInfo
plannedBlocksInfo *BlocksInfo

globalLock, loadedLock sync.Mutex
disableCORS bool
bkt objstore.Bucket
disableAdminOperations bool
disableCORS bool
bkt objstore.Bucket
disableAdminOperations bool
}

type BlocksInfo struct {
Expand Down Expand Up @@ -77,6 +78,10 @@ func NewBlocksAPI(logger log.Logger, disableCORS bool, label string, flagsMap ma
Blocks: []metadata.Meta{},
Label: label,
},
plannedBlocksInfo: &BlocksInfo{
Blocks: []metadata.Meta{},
Label: label,
},
disableCORS: disableCORS,
bkt: bkt,
disableAdminOperations: disableAdminOperations,
Expand All @@ -90,6 +95,7 @@ func (bapi *BlocksAPI) Register(r *route.Router, tracer opentracing.Tracer, logg

r.Get("/blocks", instr("blocks", bapi.blocks))
r.Post("/blocks/mark", instr("blocks_mark", bapi.markBlock))
r.Get("/blocks/plan", instr("blocks_plan", bapi.plannedBlocks))
}

func (bapi *BlocksAPI) markBlock(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Expand Down Expand Up @@ -146,6 +152,10 @@ func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiEr
return bapi.globalBlocksInfo, nil, nil, func() {}
}

func (bapi *BlocksAPI) plannedBlocks(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
return bapi.plannedBlocksInfo, nil, nil, func() {}
}

func (b *BlocksInfo) set(blocks []metadata.Meta, err error) {
if err != nil {
// Last view is maintained.
Expand Down Expand Up @@ -174,3 +184,8 @@ func (bapi *BlocksAPI) SetLoaded(blocks []metadata.Meta, err error) {

bapi.loadedBlocksInfo.set(blocks, err)
}

// SetPlanned updates the plan blocks' metadata in the API.
func (bapi *BlocksAPI) SetPlanned(blocks []metadata.Meta, err error) {
bapi.plannedBlocksInfo.set(blocks, err)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a append operation

}
105 changes: 103 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ type Syncer struct {
g singleflight.Group
}

func (s *Syncer) SetPlanned(metas []metadata.Meta) error {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, meta := range metas {
s.blocks[meta.ULID] = &meta
}
return nil
}

// SyncerMetrics holds metrics tracked by the syncer. This struct and its fields are exported
// to allow depending projects (eg. Cortex) to implement their own custom syncer while tracking
// compatible metrics.
Expand Down Expand Up @@ -411,6 +420,12 @@ type Group struct {
extensions any
}

func (g *Group) Metas() []*metadata.Meta {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.metasByMinTime
}

// NewGroup returns a new compaction group.
func NewGroup(
logger log.Logger,
Expand Down Expand Up @@ -795,6 +810,13 @@ type Planner interface {
// Plan returns a list of blocks that should be compacted into single one.
// The blocks can be overlapping. The provided metadata has to be ordered by minTime.
Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error)
// Delete before merge: UpdateOnPlanned allows to update the planner with a function that will be called with the planned blocks and an error.
UpdateOnPlanned(f func([]metadata.Meta, error))
}

// Delete before merge: UpdateOnPlanned allows to update the planner with a function that will be called with the planned blocks and an error.
func (p *tsdbBasedPlanner) UpdateOnPlanned(f func([]metadata.Meta, error)) {
p.updateOnPlanned = f
}

type BlockDeletableChecker interface {
Expand Down Expand Up @@ -1364,6 +1386,8 @@ type BucketCompactor struct {
bkt objstore.Bucket
concurrency int
skipBlocksWithOutOfOrderChunks bool
blocksAPI BlocksAPI
extensions any
}

// NewBucketCompactor creates a new bucket compactor.
Expand All @@ -1377,6 +1401,8 @@ func NewBucketCompactor(
bkt objstore.Bucket,
concurrency int,
skipBlocksWithOutOfOrderChunks bool,
blocksAPI BlocksAPI,
extensions any,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
Expand All @@ -1393,9 +1419,15 @@ func NewBucketCompactor(
bkt,
concurrency,
skipBlocksWithOutOfOrderChunks,
blocksAPI,
extensions,
)
}

type BlocksAPI interface {
SetPlanned([]metadata.Meta, error)
}

func NewBucketCompactorWithCheckerAndCallback(
logger log.Logger,
sy *Syncer,
Expand All @@ -1408,6 +1440,8 @@ func NewBucketCompactorWithCheckerAndCallback(
bkt objstore.Bucket,
concurrency int,
skipBlocksWithOutOfOrderChunks bool,
blocksAPI BlocksAPI,
extensions any,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency)
Expand All @@ -1424,6 +1458,8 @@ func NewBucketCompactorWithCheckerAndCallback(
bkt: bkt,
concurrency: concurrency,
skipBlocksWithOutOfOrderChunks: skipBlocksWithOutOfOrderChunks,
blocksAPI: blocksAPI,
extensions: extensions,
}, nil
}

Expand Down Expand Up @@ -1455,12 +1491,46 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {

// Set up workers who will compact the groups when the groups are ready.
// They will compact available groups until they encounter an error, after which they will stop.
var allPlannedMetas []metadata.Meta
var plannedMetasMutex sync.Mutex

for i := 0; i < c.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback)
groupMetas := g.Metas()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do i need change the source from g to planner.plan, move out not for group

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read go routine, and channels ->concurrency

plannedMetas := make([]metadata.Meta, len(groupMetas))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think we are just setting blocks of the group, instead of the planned blocks? We need planner.Plan output

for i, meta := range groupMetas {
plannedMetas[i] = *meta
}

plannedMetasMutex.Lock()
allPlannedMetas = append(allPlannedMetas, plannedMetas...)
plannedMetasMutex.Unlock()

// Delete before merge: Call Syncer.SetPlanned before compaction
if err := c.sy.SetPlanned(plannedMetas); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be overriden on each group

errChan <- errors.Wrapf(err, "set planned for group %s", g.Key())
return
}

// Delete before merge: Call BlocksAPI.SetPlanned to update the API
c.blocksAPI.SetPlanned(plannedMetas, nil)

shouldRerunGroup, compIDs, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback)
if err == nil {
plannedMetas := make([]metadata.Meta, 0, len(compIDs))
for _, id := range compIDs {
if meta, ok := c.sy.Metas()[id]; ok {
plannedMetas = append(plannedMetas, *meta)
}
}
if err := c.sy.SetPlanned(plannedMetas); err != nil {
level.Warn(c.logger).Log("msg", "failed to set planned metas after compaction", "err", err)
}
c.blocksAPI.SetPlanned(plannedMetas, nil)
}
if err == nil {
if shouldRerunGroup {
mtx.Lock()
Expand Down Expand Up @@ -1531,7 +1601,33 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {

level.Info(c.logger).Log("msg", "start of compactions")

// Send all groups found during this pass to the compaction workers.
// Delete before merge: Convert map to slice
metas := make([]*metadata.Meta, 0, len(c.sy.Metas()))
for _, meta := range c.sy.Metas() {
metas = append(metas, meta)
}

// Delete before merge: Plan compaction outside of the group loop
plan, err := c.planner.Plan(ctx, metas, errChan, c.extensions)
if err != nil {
return errors.Wrap(err, "plan compaction")
}

// Delete before merge: Convert []*metadata.Meta to []metadata.Meta
plannedMetas := make([]metadata.Meta, len(plan))
for i, meta := range plan {
plannedMetas[i] = *meta
}

// Delete before merge: Call Syncer.SetPlanned before compaction
if err := c.sy.SetPlanned(plannedMetas); err != nil {
return errors.Wrap(err, "set planned metas")
}

// Delete before merge: Call BlocksAPI.SetPlanned to update the API
c.blocksAPI.SetPlanned(plannedMetas, nil)

// Now proceed with the group loop
var groupErrs errutil.MultiError
groupLoop:
for _, g := range groups {
Expand All @@ -1549,6 +1645,11 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
close(groupChan)
wg.Wait()

if err := c.sy.SetPlanned(allPlannedMetas); err != nil {
return errors.Wrap(err, "set planned metas")
}
c.blocksAPI.SetPlanned(allPlannedMetas, nil)

// Collect any other error reported by the workers, or any error reported
// while we were waiting for the last batch of groups to run the compaction.
close(errChan)
Expand Down
11 changes: 10 additions & 1 deletion pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 10, 10)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
blocksAPI := &metaFetcherWrapper{metaFetcher}
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, blocksAPI, mergeFunc)
testutil.Ok(t, err)

// Compaction on empty should not fail.
Expand Down Expand Up @@ -542,3 +543,11 @@ func listBlocksMarkedForDeletion(ctx context.Context, bkt objstore.Bucket) ([]ul
})
return rem, err
}

type metaFetcherWrapper struct {
*block.MetaFetcher
}

func (m *metaFetcherWrapper) SetPlanned(metas []metadata.Meta, err error) {
// Implement the logic as needed
}
1 change: 1 addition & 0 deletions pkg/compact/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type tsdbBasedPlanner struct {
ranges []int64

noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
updateOnPlanned func([]metadata.Meta, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe listener instead of updateOnPlanned?

}

var _ Planner = &tsdbBasedPlanner{}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ui/react-app/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { QueryParamProvider } from 'use-query-params';
import useMedia from 'use-media';

import { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList, NotFound } from './pages';
import { GlobalBlocks, PlannedBlocks } from './thanos/pages/blocks/Blocks';
import PathPrefixProps from './types/PathPrefixProps';
import ThanosComponentProps from './thanos/types/ThanosComponentProps';
import Navigation from './thanos/Navbar';
Expand All @@ -17,7 +18,7 @@ const defaultRouteConfig: { [component: string]: string } = {
query: '/graph',
rule: '/alerts',
bucket: '/blocks',
compact: '/loaded',
compact: '/global-blocks',
store: '/loaded',
};

Expand Down Expand Up @@ -61,6 +62,8 @@ const App: FC<PathPrefixProps & ThanosComponentProps> = ({ pathPrefix, thanosCom
<Stores path="/stores" pathPrefix={pathPrefix} />
<Blocks path="/blocks" pathPrefix={pathPrefix} />
<Blocks path="/loaded" pathPrefix={pathPrefix} view="loaded" />
<GlobalBlocks path="/global-blocks" pathPrefix={pathPrefix} />
<PlannedBlocks path="/planned-blocks" pathPrefix={pathPrefix} />
<NotFound pathPrefix={pathPrefix} default defaultRoute={defaultRouteConfig[thanosComponent]} />
</Router>
</QueryParamProvider>
Expand Down
3 changes: 2 additions & 1 deletion pkg/ui/react-app/src/thanos/Navbar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ const navConfig: { [component: string]: (NavConfig | NavDropDown)[] } = {
},
],
compact: [
{ name: 'Global Blocks', uri: '/blocks' },
{ name: 'Global Blocks', uri: '/global-blocks' },
{ name: 'Planned Blocks', uri: '/planned-blocks' },
{ name: 'Loaded Blocks', uri: '/loaded' },
{
name: 'Status',
Expand Down
9 changes: 8 additions & 1 deletion pkg/ui/react-app/src/thanos/pages/blocks/Blocks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ interface BlocksProps {

export const Blocks: FC<RouteComponentProps & PathPrefixProps & BlocksProps> = ({ pathPrefix = '', view = 'global' }) => {
const { response, error, isLoading } = useFetch<BlockListProps>(
`${pathPrefix}/api/v1/blocks${view ? '?view=' + view : ''}`
`${pathPrefix}/api/v1/blocks${view === 'planned' ? '/plan' : view ? '?view=' + view : ''}`
);
const { status: responseStatus } = response;
const badResponse = responseStatus !== 'success' && responseStatus !== 'start fetching';
Expand All @@ -221,5 +221,12 @@ export const Blocks: FC<RouteComponentProps & PathPrefixProps & BlocksProps> = (
/>
);
};
export const GlobalBlocks: React.FC<RouteComponentProps & PathPrefixProps> = ({ pathPrefix }) => {
return <Blocks view="global" pathPrefix={pathPrefix} />;
};

export const PlannedBlocks: React.FC<RouteComponentProps & PathPrefixProps> = ({ pathPrefix }) => {
return <Blocks view="planned" pathPrefix={pathPrefix} />;
};

export default Blocks;
Loading