Skip to content

Commit

Permalink
enhance: try compact small segments first if they may compose a full …
Browse files Browse the repository at this point in the history
…segment (milvus-io#37709)

See milvus-io#37234

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu committed Dec 4, 2024
1 parent fcbed3a commit ff3cd49
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 317 deletions.
188 changes: 43 additions & 145 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package datacoord
import (
"context"
"fmt"
"sort"
"math"
"sync"
"time"

Expand Down Expand Up @@ -283,23 +283,6 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) {
return t.allocator.AllocID(ctx)
}

func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
collMeta, err := t.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("failed to get collection", zap.Int64("collectionID", collectionID), zap.Error(err))
return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
}
allDiskIndex := t.meta.indexMeta.AreAllDiskIndex(collectionID, collMeta.Schema)
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024
}
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
}

func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
t.forceMu.Lock()
defer t.forceMu.Unlock()
Expand Down Expand Up @@ -548,107 +531,57 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
}

buckets := [][]*SegmentInfo{}
// sort segment from large to small
sort.Slice(prioritizedCandidates, func(i, j int) bool {
if prioritizedCandidates[i].getSegmentSize() != prioritizedCandidates[j].getSegmentSize() {
return prioritizedCandidates[i].getSegmentSize() > prioritizedCandidates[j].getSegmentSize()
}
return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID()
})
toUpdate := newSegmentPacker("update", prioritizedCandidates)
toMerge := newSegmentPacker("merge", smallCandidates)
toPack := newSegmentPacker("pack", nonPlannedSegments)

sort.Slice(smallCandidates, func(i, j int) bool {
if smallCandidates[i].getSegmentSize() != smallCandidates[j].getSegmentSize() {
return smallCandidates[i].getSegmentSize() > smallCandidates[j].getSegmentSize()
}
return smallCandidates[i].GetID() < smallCandidates[j].GetID()
})

// Sort non-planned from small to large.
sort.Slice(nonPlannedSegments, func(i, j int) bool {
if nonPlannedSegments[i].getSegmentSize() != nonPlannedSegments[j].getSegmentSize() {
return nonPlannedSegments[i].getSegmentSize() < nonPlannedSegments[j].getSegmentSize()
maxSegs := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt64()
minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64()
compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()
satisfiedSize := int64(float64(expectedSize) * compactableProportion)
expantionRate := Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()
maxLeftSize := expectedSize - satisfiedSize
expectedExpandedSize := int64(float64(expectedSize) * expantionRate)
maxExpandedLeftSize := expectedExpandedSize - satisfiedSize
// 1. Merge small segments if they can make a full bucket
for {
pack, _ := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs)
if len(pack) == 0 {
break
}
return nonPlannedSegments[i].GetID() > nonPlannedSegments[j].GetID()
})

// greedy pick from large segment to small, the goal is to fill each segment to reach 512M
buckets = append(buckets, pack)
}
// 2. greedy pick from large segment to small, the goal is to fill each segment to reach 512M
// we must ensure all prioritized candidates is in a plan
// TODO the compaction selection policy should consider if compaction workload is high
for len(prioritizedCandidates) > 0 {
var bucket []*SegmentInfo
// pop out the first element
segment := prioritizedCandidates[0]
bucket = append(bucket, segment)
prioritizedCandidates = prioritizedCandidates[1:]

// only do single file compaction if segment is already large enough
if segment.getSegmentSize() < expectedSize {
var result []*SegmentInfo
free := expectedSize - segment.getSegmentSize()
maxNum := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt() - 1
prioritizedCandidates, result, free = greedySelect(prioritizedCandidates, free, maxNum)
bucket = append(bucket, result...)
maxNum -= len(result)
if maxNum > 0 {
smallCandidates, result, _ = greedySelect(smallCandidates, free, maxNum)
bucket = append(bucket, result...)
}
}
// since this is priority compaction, we will execute even if there is only segment
log.Info("pick priority candidate for compaction",
zap.Int64("prioritized segmentID", segment.GetID()),
zap.Int64s("picked segmentIDs", lo.Map(bucket, func(s *SegmentInfo, _ int) int64 { return s.GetID() })),
zap.Int64("target size", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })),
zap.Int64("target count", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.GetNumOfRows() })),
)
buckets = append(buckets, bucket)
}

var remainingSmallSegs []*SegmentInfo
// check if there are small candidates left can be merged into large segments
for len(smallCandidates) > 0 {
var bucket []*SegmentInfo
// pop out the first element
segment := smallCandidates[0]
bucket = append(bucket, segment)
smallCandidates = smallCandidates[1:]

var result []*SegmentInfo
free := expectedSize - segment.getSegmentSize()
// for small segment merge, we pick one largest segment and merge as much as small segment together with it
// Why reverse? try to merge as many segments as expected.
// for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1)
bucket = append(bucket, result...)

// only merge if candidate number is large than MinSegmentToMerge or if target size is large enough
targetSize := lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() ||
len(bucket) > 1 && t.isCompactableSegment(targetSize, expectedSize) {
buckets = append(buckets, bucket)
} else {
remainingSmallSegs = append(remainingSmallSegs, bucket...)
for {
// No limit on the remaining size because we want to pack all prioritized candidates
pack, _ := toUpdate.packWith(expectedSize, math.MaxInt64, 0, maxSegs, toMerge)
if len(pack) == 0 {
break
}
buckets = append(buckets, pack)
}

remainingSmallSegs = t.squeezeSmallSegmentsToBuckets(remainingSmallSegs, buckets, expectedSize)

// If there are still remaining small segments, try adding them to non-planned segments.
for _, npSeg := range nonPlannedSegments {
bucket := []*SegmentInfo{npSeg}
targetSize := npSeg.getSegmentSize()
for i := len(remainingSmallSegs) - 1; i >= 0; i-- {
// Note: could also simply use MaxRowNum as limit.
if targetSize+remainingSmallSegs[i].getSegmentSize() <=
int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) {
bucket = append(bucket, remainingSmallSegs[i])
targetSize += remainingSmallSegs[i].getSegmentSize()
remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...)
}
// 2.+ legacy: squeeze small segments
// Try merge all small segments, and then squeeze
for {
pack, _ := toMerge.pack(expectedSize, math.MaxInt64, minSegs, maxSegs)
if len(pack) == 0 {
break
}
if len(bucket) > 1 {
buckets = append(buckets, bucket)
buckets = append(buckets, pack)
}
remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize)
toMerge = newSegmentPacker("merge", remaining)

// 3. pack remaining small segments with non-planned segments
for {
pack, _ := toMerge.packWith(expectedExpandedSize, maxExpandedLeftSize, minSegs, maxSegs, toPack)
if len(pack) == 0 {
break
}
buckets = append(buckets, pack)
}

tasks := make([]*typeutil.Pair[int64, []int64], len(buckets))
Expand All @@ -666,37 +599,6 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
return tasks
}

func greedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) {
var result []*SegmentInfo

for i := 0; i < len(candidates); {
candidate := candidates[i]
if len(result) < maxSegment && candidate.getSegmentSize() < free {
result = append(result, candidate)
free -= candidate.getSegmentSize()
candidates = append(candidates[:i], candidates[i+1:]...)
} else {
i++
}
}

return candidates, result, free
}

func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) {
var result []*SegmentInfo

for i := len(candidates) - 1; i >= 0; i-- {
candidate := candidates[i]
if (len(result) < maxSegment) && (candidate.getSegmentSize() < free) {
result = append(result, candidate)
free -= candidate.getSegmentSize()
candidates = append(candidates[:i], candidates[i+1:]...)
}
}
return candidates, result, free
}

func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
Expand Down Expand Up @@ -743,10 +645,6 @@ func isExpandableSmallSegment(segment *SegmentInfo, expectedSize int64) bool {

func isDeltalogTooManySegment(segment *SegmentInfo) bool {
deltaLogCount := GetBinlogCount(segment.GetDeltalogs())
log.Debug("isDeltalogTooManySegment",
zap.Int64("collectionID", segment.CollectionID),
zap.Int64("segmentID", segment.ID),
zap.Int("deltaLogCount", deltaLogCount))
return deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt()
}

Expand Down
Loading

0 comments on commit ff3cd49

Please sign in to comment.